You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2016/11/17 09:59:47 UTC

camel git commit: CAMEL-10489: Camel-Nats: Add Flush option with timeout

Repository: camel
Updated Branches:
  refs/heads/master 06048c832 -> cb5617e0d


CAMEL-10489: Camel-Nats: Add Flush option with timeout


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/cb5617e0
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/cb5617e0
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/cb5617e0

Branch: refs/heads/master
Commit: cb5617e0d94288b8597ff23cbe69ddd72d86c999
Parents: 06048c8
Author: Andrea Cosentino <an...@gmail.com>
Authored: Thu Nov 17 10:59:11 2016 +0100
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Thu Nov 17 10:59:11 2016 +0100

----------------------------------------------------------------------
 .../src/main/docs/nats-component.adoc           |  4 ++-
 .../camel/component/nats/NatsConfiguration.java | 26 ++++++++++++++++++++
 .../camel/component/nats/NatsConsumer.java      |  6 +++--
 .../camel/component/nats/NatsProducer.java      |  4 +++
 .../camel/component/nats/NatsConsumerTest.java  |  4 +--
 5 files changed, 39 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/cb5617e0/components/camel-nats/src/main/docs/nats-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/main/docs/nats-component.adoc b/components/camel-nats/src/main/docs/nats-component.adoc
index 2c0839c..3965ef7 100644
--- a/components/camel-nats/src/main/docs/nats-component.adoc
+++ b/components/camel-nats/src/main/docs/nats-component.adoc
@@ -44,13 +44,15 @@ The Nats component has no options.
 
 
 // endpoint options: START
-The Nats component supports 21 endpoint options which are listed below:
+The Nats component supports 23 endpoint options which are listed below:
 
 {% raw %}
 [width="100%",cols="2,1,1m,1m,5",options="header"]
 |=======================================================================
 | Name | Group | Default | Java Type | Description
 | servers | common |  | String | *Required* URLs to one or more NAT servers. Use comma to separate URLs when specifying multiple servers.
+| flushConnection | common | false | boolean | Define if we want to flush connection or not
+| flushTimeout | common | 1000 | int | Set the flush timeout
 | maxReconnectAttempts | common | 3 | int | Max reconnection attempts
 | noRandomizeServers | common | false | boolean | Whether or not randomizing the order of servers for the connection attempts
 | pedantic | common | false | boolean | Whether or not running in pedantic mode (this affects performace)

http://git-wip-us.apache.org/repos/asf/camel/blob/cb5617e0/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
index 641b24b..dab069e 100644
--- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
+++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
@@ -57,6 +57,10 @@ public class NatsConfiguration {
     private String maxMessages;
     @UriParam(label = "consumer", defaultValue = "10")
     private int poolSize = 10;
+    @UriParam(label = "common", defaultValue = "false")
+    private boolean flushConnection;
+    @UriParam(label = "common", defaultValue = "1000")
+    private int flushTimeout = 1000;
     @UriParam(label = "security")
     private boolean secure;
     @UriParam(label = "security")
@@ -219,6 +223,28 @@ public class NatsConfiguration {
         this.poolSize = poolSize;
     }
 
+    public boolean isFlushConnection() {
+        return flushConnection;
+    }
+
+    /**
+     * Define if we want to flush connection or not
+     */
+    public void setFlushConnection(boolean flushConnection) {
+        this.flushConnection = flushConnection;
+    }
+
+    public int getFlushTimeout() {
+        return flushTimeout;
+    }
+
+    /**
+     * Set the flush timeout
+     */
+    public void setFlushTimeout(int flushTimeout) {
+        this.flushTimeout = flushTimeout;
+    }
+
     /**
      * Set secure option indicating TLS is required
      */

http://git-wip-us.apache.org/repos/asf/camel/blob/cb5617e0/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
index 37e5171..95bc0e3 100644
--- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
+++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
@@ -73,8 +73,10 @@ public class NatsConsumer extends DefaultConsumer {
     protected void doStop() throws Exception {
         super.doStop();
 
-        LOG.debug("Flushing Messages before stopping");
-        connection.flush();
+        if (getEndpoint().getNatsConfiguration().isFlushConnection()) {
+            LOG.debug("Flushing Messages before stopping");
+            connection.flush(getEndpoint().getNatsConfiguration().getFlushTimeout());
+        }
         
         try {
             sid.unsubscribe();

http://git-wip-us.apache.org/repos/asf/camel/blob/cb5617e0/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
index 0efafe2..1be13e3 100644
--- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
+++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
@@ -78,6 +78,10 @@ public class NatsProducer extends DefaultProducer {
         
         LOG.debug("Closing Nats Connection");
         if (connection != null && !connection.isClosed()) {
+            if (getEndpoint().getNatsConfiguration().isFlushConnection()) {
+                LOG.debug("Flushing Nats Connection");
+                connection.flush(getEndpoint().getNatsConfiguration().getFlushTimeout());
+            }
             connection.close();
         }
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/cb5617e0/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java
index ca63048..24d4877 100644
--- a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java
+++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java
@@ -45,8 +45,8 @@ public class NatsConsumerTest extends CamelTestSupport {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("direct:send").to("nats://localhost:4222?topic=test");
-                from("nats://localhost:4222?topic=test").to(mockResultEndpoint);
+                from("direct:send").to("nats://localhost:4222?topic=test&flushConnection=true");
+                from("nats://localhost:4222?topic=test&flushConnection=true").to(mockResultEndpoint);
             }
         };
     }