You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2014/10/17 10:22:30 UTC

[2/3] git commit: CAMEL-7922 start the MQTT connection when consumer or producer is started

CAMEL-7922 start the MQTT connection when consumer or producer is started


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/feea16ad
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/feea16ad
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/feea16ad

Branch: refs/heads/master
Commit: feea16ada605803f81945db55b2344f2b000f0b5
Parents: 73f69d8
Author: Willem Jiang <wi...@gmail.com>
Authored: Fri Oct 17 14:33:39 2014 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Fri Oct 17 14:33:39 2014 +0800

----------------------------------------------------------------------
 .../camel/component/mqtt/MQTTConsumer.java      |  3 ++
 .../camel/component/mqtt/MQTTEndpoint.java      | 47 +++++++++++++-------
 .../camel/component/mqtt/MQTTProducer.java      |  9 ++++
 3 files changed, 42 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/feea16ad/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConsumer.java b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConsumer.java
index 934e419..449a767 100644
--- a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConsumer.java
+++ b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConsumer.java
@@ -33,6 +33,9 @@ public class MQTTConsumer extends DefaultConsumer {
 
     protected void doStart() throws Exception {
         getEndpoint().addConsumer(this);
+        if (!getEndpoint().isConnected()) {
+            getEndpoint().connect();
+        }
         super.doStart();
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/feea16ad/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
index 651049c..07014ad 100644
--- a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
+++ b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
@@ -44,6 +44,7 @@ public class MQTTEndpoint extends DefaultEndpoint {
 
     private CallbackConnection connection;
     private final MQTTConfiguration configuration;
+    private volatile boolean connected;
     private final List<MQTTConsumer> consumers = new CopyOnWriteArrayList<MQTTConsumer>();
 
     public MQTTEndpoint(String uri, MQTTComponent component, MQTTConfiguration properties) {
@@ -107,6 +108,27 @@ public class MQTTEndpoint extends DefaultEndpoint {
             }
         });
 
+        
+    }
+
+    protected void doStop() throws Exception {
+        if (connection != null) {
+            final Promise<Void> promise = new Promise<Void>();
+            connection.disconnect(new Callback<Void>() {
+                public void onSuccess(Void value) {
+                    promise.onSuccess(value);
+                }
+
+                public void onFailure(Throwable value) {
+                    promise.onFailure(value);
+                }
+            });
+            promise.await(configuration.getDisconnectWaitInSeconds(), TimeUnit.SECONDS);
+        }
+        super.doStop();
+    }
+    
+    void connect() throws Exception {
         final Promise<Object> promise = new Promise<Object>();
         connection.connect(new Callback<Void>() {
             public void onSuccess(Void value) {
@@ -118,15 +140,18 @@ public class MQTTEndpoint extends DefaultEndpoint {
                     connection.subscribe(topics, new Callback<byte[]>() {
                         public void onSuccess(byte[] value) {
                             promise.onSuccess(value);
+                            connected = true;
                         }
 
                         public void onFailure(Throwable value) {
                             promise.onFailure(value);
                             connection.disconnect(null);
+                            connected = false;
                         }
                     });
                 } else {
                     promise.onSuccess(value);
+                    connected = true;
                 }
 
             }
@@ -134,28 +159,16 @@ public class MQTTEndpoint extends DefaultEndpoint {
             public void onFailure(Throwable value) {
                 promise.onFailure(value);
                 connection.disconnect(null);
+                connected = false;
             }
         });
         promise.await(configuration.getConnectWaitInSeconds(), TimeUnit.SECONDS);
     }
-
-    protected void doStop() throws Exception {
-        if (connection != null) {
-            final Promise<Void> promise = new Promise<Void>();
-            connection.disconnect(new Callback<Void>() {
-                public void onSuccess(Void value) {
-                    promise.onSuccess(value);
-                }
-
-                public void onFailure(Throwable value) {
-                    promise.onFailure(value);
-                }
-            });
-            promise.await(configuration.getDisconnectWaitInSeconds(), TimeUnit.SECONDS);
-        }
-        super.doStop();
+    
+    boolean isConnected() {
+        return connected;
     }
-
+ 
     void publish(String topic, byte[] payload, QoS qoS, boolean retain, Callback<Void> callback) throws Exception {
         connection.publish(topic, payload, qoS, retain, callback);
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/feea16ad/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTProducer.java b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTProducer.java
index 59ff90b..751453a 100644
--- a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTProducer.java
+++ b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTProducer.java
@@ -31,6 +31,15 @@ public class MQTTProducer extends DefaultAsyncProducer implements Processor {
         super(mqttEndpoint);
         this.mqttEndpoint = mqttEndpoint;
     }
+    
+    protected void doStart() throws Exception {
+        // check the mqttEndpoint connection when it is started
+        if (!mqttEndpoint.isConnected()) {
+            mqttEndpoint.connect();
+        }
+        super.doStart();
+    }
+
 
     @Override
     public boolean process(final Exchange exchange, final AsyncCallback callback) {