You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2014/10/17 10:22:30 UTC
[2/3] git commit: CAMEL-7922 start the MQTT connection when consumer
or producer is started
CAMEL-7922 start the MQTT connection when consumer or producer is started
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/feea16ad
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/feea16ad
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/feea16ad
Branch: refs/heads/master
Commit: feea16ada605803f81945db55b2344f2b000f0b5
Parents: 73f69d8
Author: Willem Jiang <wi...@gmail.com>
Authored: Fri Oct 17 14:33:39 2014 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Fri Oct 17 14:33:39 2014 +0800
----------------------------------------------------------------------
.../camel/component/mqtt/MQTTConsumer.java | 3 ++
.../camel/component/mqtt/MQTTEndpoint.java | 47 +++++++++++++-------
.../camel/component/mqtt/MQTTProducer.java | 9 ++++
3 files changed, 42 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/feea16ad/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConsumer.java b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConsumer.java
index 934e419..449a767 100644
--- a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConsumer.java
+++ b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConsumer.java
@@ -33,6 +33,9 @@ public class MQTTConsumer extends DefaultConsumer {
protected void doStart() throws Exception {
getEndpoint().addConsumer(this);
+ if (!getEndpoint().isConnected()) {
+ getEndpoint().connect();
+ }
super.doStart();
}
http://git-wip-us.apache.org/repos/asf/camel/blob/feea16ad/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
index 651049c..07014ad 100644
--- a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
+++ b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
@@ -44,6 +44,7 @@ public class MQTTEndpoint extends DefaultEndpoint {
private CallbackConnection connection;
private final MQTTConfiguration configuration;
+ private volatile boolean connected;
private final List<MQTTConsumer> consumers = new CopyOnWriteArrayList<MQTTConsumer>();
public MQTTEndpoint(String uri, MQTTComponent component, MQTTConfiguration properties) {
@@ -107,6 +108,27 @@ public class MQTTEndpoint extends DefaultEndpoint {
}
});
+
+ }
+
+ protected void doStop() throws Exception {
+ if (connection != null) {
+ final Promise<Void> promise = new Promise<Void>();
+ connection.disconnect(new Callback<Void>() {
+ public void onSuccess(Void value) {
+ promise.onSuccess(value);
+ }
+
+ public void onFailure(Throwable value) {
+ promise.onFailure(value);
+ }
+ });
+ promise.await(configuration.getDisconnectWaitInSeconds(), TimeUnit.SECONDS);
+ }
+ super.doStop();
+ }
+
+ void connect() throws Exception {
final Promise<Object> promise = new Promise<Object>();
connection.connect(new Callback<Void>() {
public void onSuccess(Void value) {
@@ -118,15 +140,18 @@ public class MQTTEndpoint extends DefaultEndpoint {
connection.subscribe(topics, new Callback<byte[]>() {
public void onSuccess(byte[] value) {
promise.onSuccess(value);
+ connected = true;
}
public void onFailure(Throwable value) {
promise.onFailure(value);
connection.disconnect(null);
+ connected = false;
}
});
} else {
promise.onSuccess(value);
+ connected = true;
}
}
@@ -134,28 +159,16 @@ public class MQTTEndpoint extends DefaultEndpoint {
public void onFailure(Throwable value) {
promise.onFailure(value);
connection.disconnect(null);
+ connected = false;
}
});
promise.await(configuration.getConnectWaitInSeconds(), TimeUnit.SECONDS);
}
-
- protected void doStop() throws Exception {
- if (connection != null) {
- final Promise<Void> promise = new Promise<Void>();
- connection.disconnect(new Callback<Void>() {
- public void onSuccess(Void value) {
- promise.onSuccess(value);
- }
-
- public void onFailure(Throwable value) {
- promise.onFailure(value);
- }
- });
- promise.await(configuration.getDisconnectWaitInSeconds(), TimeUnit.SECONDS);
- }
- super.doStop();
+
+ boolean isConnected() {
+ return connected;
}
-
+
void publish(String topic, byte[] payload, QoS qoS, boolean retain, Callback<Void> callback) throws Exception {
connection.publish(topic, payload, qoS, retain, callback);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/feea16ad/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTProducer.java b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTProducer.java
index 59ff90b..751453a 100644
--- a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTProducer.java
+++ b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTProducer.java
@@ -31,6 +31,15 @@ public class MQTTProducer extends DefaultAsyncProducer implements Processor {
super(mqttEndpoint);
this.mqttEndpoint = mqttEndpoint;
}
+
+ protected void doStart() throws Exception {
+ // check the mqttEndpoint connection when it is started
+ if (!mqttEndpoint.isConnected()) {
+ mqttEndpoint.connect();
+ }
+ super.doStart();
+ }
+
@Override
public boolean process(final Exchange exchange, final AsyncCallback callback) {