You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2016/11/17 09:59:47 UTC
camel git commit: CAMEL-10489: Camel-Nats: Add Flush option with
timeout
Repository: camel
Updated Branches:
refs/heads/master 06048c832 -> cb5617e0d
CAMEL-10489: Camel-Nats: Add Flush option with timeout
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/cb5617e0
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/cb5617e0
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/cb5617e0
Branch: refs/heads/master
Commit: cb5617e0d94288b8597ff23cbe69ddd72d86c999
Parents: 06048c8
Author: Andrea Cosentino <an...@gmail.com>
Authored: Thu Nov 17 10:59:11 2016 +0100
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Thu Nov 17 10:59:11 2016 +0100
----------------------------------------------------------------------
.../src/main/docs/nats-component.adoc | 4 ++-
.../camel/component/nats/NatsConfiguration.java | 26 ++++++++++++++++++++
.../camel/component/nats/NatsConsumer.java | 6 +++--
.../camel/component/nats/NatsProducer.java | 4 +++
.../camel/component/nats/NatsConsumerTest.java | 4 +--
5 files changed, 39 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/cb5617e0/components/camel-nats/src/main/docs/nats-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/main/docs/nats-component.adoc b/components/camel-nats/src/main/docs/nats-component.adoc
index 2c0839c..3965ef7 100644
--- a/components/camel-nats/src/main/docs/nats-component.adoc
+++ b/components/camel-nats/src/main/docs/nats-component.adoc
@@ -44,13 +44,15 @@ The Nats component has no options.
// endpoint options: START
-The Nats component supports 21 endpoint options which are listed below:
+The Nats component supports 23 endpoint options which are listed below:
{% raw %}
[width="100%",cols="2,1,1m,1m,5",options="header"]
|=======================================================================
| Name | Group | Default | Java Type | Description
| servers | common | | String | *Required* URLs to one or more NAT servers. Use comma to separate URLs when specifying multiple servers.
+| flushConnection | common | false | boolean | Define if we want to flush connection or not
+| flushTimeout | common | 1000 | int | Set the flush timeout
| maxReconnectAttempts | common | 3 | int | Max reconnection attempts
| noRandomizeServers | common | false | boolean | Whether or not randomizing the order of servers for the connection attempts
| pedantic | common | false | boolean | Whether or not running in pedantic mode (this affects performace)
http://git-wip-us.apache.org/repos/asf/camel/blob/cb5617e0/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
index 641b24b..dab069e 100644
--- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
+++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
@@ -57,6 +57,10 @@ public class NatsConfiguration {
private String maxMessages;
@UriParam(label = "consumer", defaultValue = "10")
private int poolSize = 10;
+ @UriParam(label = "common", defaultValue = "false")
+ private boolean flushConnection;
+ @UriParam(label = "common", defaultValue = "1000")
+ private int flushTimeout = 1000;
@UriParam(label = "security")
private boolean secure;
@UriParam(label = "security")
@@ -219,6 +223,28 @@ public class NatsConfiguration {
this.poolSize = poolSize;
}
+ public boolean isFlushConnection() {
+ return flushConnection;
+ }
+
+ /**
+ * Define if we want to flush connection or not
+ */
+ public void setFlushConnection(boolean flushConnection) {
+ this.flushConnection = flushConnection;
+ }
+
+ public int getFlushTimeout() {
+ return flushTimeout;
+ }
+
+ /**
+ * Set the flush timeout
+ */
+ public void setFlushTimeout(int flushTimeout) {
+ this.flushTimeout = flushTimeout;
+ }
+
/**
* Set secure option indicating TLS is required
*/
http://git-wip-us.apache.org/repos/asf/camel/blob/cb5617e0/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
index 37e5171..95bc0e3 100644
--- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
+++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
@@ -73,8 +73,10 @@ public class NatsConsumer extends DefaultConsumer {
protected void doStop() throws Exception {
super.doStop();
- LOG.debug("Flushing Messages before stopping");
- connection.flush();
+ if (getEndpoint().getNatsConfiguration().isFlushConnection()) {
+ LOG.debug("Flushing Messages before stopping");
+ connection.flush(getEndpoint().getNatsConfiguration().getFlushTimeout());
+ }
try {
sid.unsubscribe();
http://git-wip-us.apache.org/repos/asf/camel/blob/cb5617e0/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
index 0efafe2..1be13e3 100644
--- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
+++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
@@ -78,6 +78,10 @@ public class NatsProducer extends DefaultProducer {
LOG.debug("Closing Nats Connection");
if (connection != null && !connection.isClosed()) {
+ if (getEndpoint().getNatsConfiguration().isFlushConnection()) {
+ LOG.debug("Flushing Nats Connection");
+ connection.flush(getEndpoint().getNatsConfiguration().getFlushTimeout());
+ }
connection.close();
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/cb5617e0/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java
index ca63048..24d4877 100644
--- a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java
+++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java
@@ -45,8 +45,8 @@ public class NatsConsumerTest extends CamelTestSupport {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("direct:send").to("nats://localhost:4222?topic=test");
- from("nats://localhost:4222?topic=test").to(mockResultEndpoint);
+ from("direct:send").to("nats://localhost:4222?topic=test&flushConnection=true");
+ from("nats://localhost:4222?topic=test&flushConnection=true").to(mockResultEndpoint);
}
};
}