You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2018/07/23 13:30:39 UTC
[camel] 01/02: CAMEL-12675 - Camel-Nats: Add a way to provide a
connection to Nats server already instantiated
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit c81e203a8887f01a43350f3182506bda03d65376
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Mon Jul 23 15:16:44 2018 +0200
CAMEL-12675 - Camel-Nats: Add a way to provide a connection to Nats server already instantiated
---
.../camel-nats/src/main/docs/nats-component.adoc | 3 ++-
.../camel/component/nats/NatsConfiguration.java | 14 ++++++++++++++
.../apache/camel/component/nats/NatsConsumer.java | 13 ++++++++-----
.../apache/camel/component/nats/NatsProducer.java | 20 +++++++++++---------
4 files changed, 35 insertions(+), 15 deletions(-)
diff --git a/components/camel-nats/src/main/docs/nats-component.adoc b/components/camel-nats/src/main/docs/nats-component.adoc
index 2f19c59..a3e4ebf 100644
--- a/components/camel-nats/src/main/docs/nats-component.adoc
+++ b/components/camel-nats/src/main/docs/nats-component.adoc
@@ -66,12 +66,13 @@ with the following path and query parameters:
|===
-==== Query Parameters (24 parameters):
+==== Query Parameters (25 parameters):
[width="100%",cols="2,5,^1,2",options="header"]
|===
| Name | Description | Default | Type
+| *connection* (common) | Reference an already instantiated connection to Nats server | | Connection
| *connectionTimeout* (common) | Timeout for connection attempts. (in milliseconds) | 2000 | int
| *flushConnection* (common) | Define if we want to flush connection or not | false | boolean
| *flushTimeout* (common) | Set the flush timeout (in milliseconds) | 1000 | int
diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
index 481d66b..2442a37 100644
--- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
+++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.nats;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
+import io.nats.client.Connection;
import io.nats.client.Options;
import io.nats.client.Options.Builder;
@@ -37,6 +38,8 @@ public class NatsConfiguration {
@UriParam
@Metadata(required = "true")
private String topic;
+ @UriParam
+ private Connection connection;
@UriParam(defaultValue = "true")
private boolean reconnect = true;
@UriParam
@@ -97,6 +100,17 @@ public class NatsConfiguration {
public void setTopic(String topic) {
this.topic = topic;
}
+
+ /**
+ * Reference an already instantiated connection to Nats server
+ */
+ public Connection getConnection() {
+ return connection;
+ }
+
+ public void setConnection(Connection connection) {
+ this.connection = connection;
+ }
/**
* Whether or not using reconnection feature
diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
index df81dc5..e31924c 100644
--- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
+++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
@@ -66,14 +66,14 @@ public class NatsConsumer extends DefaultConsumer {
executor = getEndpoint().createExecutor();
LOG.debug("Getting Nats Connection");
- connection = getEndpoint().getConnection();
+ connection = getEndpoint().getNatsConfiguration().getConnection() != null ?
+ getEndpoint().getNatsConfiguration().getConnection():getEndpoint().getConnection();
executor.submit(new NatsConsumingTask(connection, getEndpoint().getNatsConfiguration()));
}
@Override
protected void doStop() throws Exception {
- super.doStop();
if (getEndpoint().getNatsConfiguration().isFlushConnection()) {
LOG.debug("Flushing Messages before stopping");
@@ -96,10 +96,13 @@ public class NatsConsumer extends DefaultConsumer {
}
executor = null;
- LOG.debug("Closing Nats Connection");
- if (!connection.getStatus().equals(Status.CLOSED)) {
- connection.close();
+ if (ObjectHelper.isEmpty(getEndpoint().getNatsConfiguration().getConnection())) {
+ LOG.debug("Closing Nats Connection");
+ if (!connection.getStatus().equals(Status.CLOSED)) {
+ connection.close();
+ }
}
+ super.doStop();
}
public boolean isActive() {
diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
index 844f65f..1812f60 100644
--- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
+++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
@@ -70,22 +70,24 @@ public class NatsProducer extends DefaultProducer {
LOG.debug("Starting Nats Producer");
LOG.debug("Getting Nats Connection");
- connection = getEndpoint().getConnection();
+ connection = getEndpoint().getNatsConfiguration().getConnection() != null ?
+ getEndpoint().getNatsConfiguration().getConnection():getEndpoint().getConnection();
}
@Override
protected void doStop() throws Exception {
- super.doStop();
LOG.debug("Stopping Nats Producer");
-
- LOG.debug("Closing Nats Connection");
- if (connection != null && !connection.getStatus().equals(Status.CLOSED)) {
- if (getEndpoint().getNatsConfiguration().isFlushConnection()) {
- LOG.debug("Flushing Nats Connection");
- connection.flush(Duration.ofMillis(getEndpoint().getNatsConfiguration().getFlushTimeout()));
+ if (ObjectHelper.isEmpty(getEndpoint().getNatsConfiguration().getConnection())) {
+ LOG.debug("Closing Nats Connection");
+ if (connection != null && !connection.getStatus().equals(Status.CLOSED)) {
+ if (getEndpoint().getNatsConfiguration().isFlushConnection()) {
+ LOG.debug("Flushing Nats Connection");
+ connection.flush(Duration.ofMillis(getEndpoint().getNatsConfiguration().getFlushTimeout()));
+ }
+ connection.close();
}
- connection.close();
}
+ super.doStop();
}
}