You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2018/07/23 13:30:39 UTC

[camel] 01/02: CAMEL-12675 - Camel-Nats: Add a way to provide a connection to Nats server already instantiated

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit c81e203a8887f01a43350f3182506bda03d65376
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Mon Jul 23 15:16:44 2018 +0200

    CAMEL-12675 - Camel-Nats: Add a way to provide a connection to Nats server already instantiated
---
 .../camel-nats/src/main/docs/nats-component.adoc     |  3 ++-
 .../camel/component/nats/NatsConfiguration.java      | 14 ++++++++++++++
 .../apache/camel/component/nats/NatsConsumer.java    | 13 ++++++++-----
 .../apache/camel/component/nats/NatsProducer.java    | 20 +++++++++++---------
 4 files changed, 35 insertions(+), 15 deletions(-)

diff --git a/components/camel-nats/src/main/docs/nats-component.adoc b/components/camel-nats/src/main/docs/nats-component.adoc
index 2f19c59..a3e4ebf 100644
--- a/components/camel-nats/src/main/docs/nats-component.adoc
+++ b/components/camel-nats/src/main/docs/nats-component.adoc
@@ -66,12 +66,13 @@ with the following path and query parameters:
 |===
 
 
-==== Query Parameters (24 parameters):
+==== Query Parameters (25 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |===
 | Name | Description | Default | Type
+| *connection* (common) | Reference an already instantiated connection to Nats server |  | Connection
 | *connectionTimeout* (common) | Timeout for connection attempts. (in milliseconds) | 2000 | int
 | *flushConnection* (common) | Define if we want to flush connection or not | false | boolean
 | *flushTimeout* (common) | Set the flush timeout (in milliseconds) | 1000 | int
diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
index 481d66b..2442a37 100644
--- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
+++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.nats;
 import java.security.NoSuchAlgorithmException;
 import java.time.Duration;
 
+import io.nats.client.Connection;
 import io.nats.client.Options;
 import io.nats.client.Options.Builder;
 
@@ -37,6 +38,8 @@ public class NatsConfiguration {
     @UriParam
     @Metadata(required = "true")
     private String topic;
+    @UriParam
+    private Connection connection;
     @UriParam(defaultValue = "true")
     private boolean reconnect = true;
     @UriParam
@@ -97,6 +100,17 @@ public class NatsConfiguration {
     public void setTopic(String topic) {
         this.topic = topic;
     }
+    
+    /**
+     * Reference an already instantiated connection to Nats server
+     */  
+    public Connection getConnection() {
+        return connection;
+    }
+
+    public void setConnection(Connection connection) {
+        this.connection = connection;
+    }
 
     /**
      * Whether or not using reconnection feature
diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
index df81dc5..e31924c 100644
--- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
+++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
@@ -66,14 +66,14 @@ public class NatsConsumer extends DefaultConsumer {
         executor = getEndpoint().createExecutor();
 
         LOG.debug("Getting Nats Connection");
-        connection = getEndpoint().getConnection();
+        connection = getEndpoint().getNatsConfiguration().getConnection() != null ? 
+            getEndpoint().getNatsConfiguration().getConnection():getEndpoint().getConnection();
 
         executor.submit(new NatsConsumingTask(connection, getEndpoint().getNatsConfiguration()));
     }
 
     @Override
     protected void doStop() throws Exception {
-        super.doStop();
 
         if (getEndpoint().getNatsConfiguration().isFlushConnection()) {
             LOG.debug("Flushing Messages before stopping");
@@ -96,10 +96,13 @@ public class NatsConsumer extends DefaultConsumer {
         }
         executor = null;
 
-        LOG.debug("Closing Nats Connection");
-        if (!connection.getStatus().equals(Status.CLOSED)) {
-            connection.close();
+        if (ObjectHelper.isEmpty(getEndpoint().getNatsConfiguration().getConnection())) {
+            LOG.debug("Closing Nats Connection");
+            if (!connection.getStatus().equals(Status.CLOSED)) {
+                connection.close();
+            }
         }
+        super.doStop();
     }
 
     public boolean isActive() {
diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
index 844f65f..1812f60 100644
--- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
+++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
@@ -70,22 +70,24 @@ public class NatsProducer extends DefaultProducer {
         LOG.debug("Starting Nats Producer");
         
         LOG.debug("Getting Nats Connection");
-        connection = getEndpoint().getConnection();
+        connection = getEndpoint().getNatsConfiguration().getConnection() != null ? 
+            getEndpoint().getNatsConfiguration().getConnection():getEndpoint().getConnection();
     }
 
     @Override
     protected void doStop() throws Exception {
-        super.doStop();
         LOG.debug("Stopping Nats Producer");
-        
-        LOG.debug("Closing Nats Connection");
-        if (connection != null && !connection.getStatus().equals(Status.CLOSED)) {
-            if (getEndpoint().getNatsConfiguration().isFlushConnection()) {
-                LOG.debug("Flushing Nats Connection");
-                connection.flush(Duration.ofMillis(getEndpoint().getNatsConfiguration().getFlushTimeout()));
+        if (ObjectHelper.isEmpty(getEndpoint().getNatsConfiguration().getConnection())) {
+            LOG.debug("Closing Nats Connection");
+            if (connection != null && !connection.getStatus().equals(Status.CLOSED)) {
+                if (getEndpoint().getNatsConfiguration().isFlushConnection()) {
+                    LOG.debug("Flushing Nats Connection");
+                    connection.flush(Duration.ofMillis(getEndpoint().getNatsConfiguration().getFlushTimeout()));
+                }
+                connection.close();
             }
-            connection.close();
         }
+        super.doStop();
     }
 
 }