You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/02/08 08:35:24 UTC

[camel] branch master updated: Add support for nats requests. (#5034)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 054bd72  Add support for nats requests. (#5034)
054bd72 is described below

commit 054bd725b7c384a80fc86547fd85c495a006f09d
Author: James Hilliard <ja...@gmail.com>
AuthorDate: Mon Feb 8 01:35:05 2021 -0700

    Add support for nats requests. (#5034)
---
 .../apache/camel/catalog/docs/nats-component.adoc  |  3 +-
 .../component/nats/NatsEndpointConfigurer.java     |  6 ++
 .../component/nats/NatsEndpointUriFactory.java     |  3 +-
 .../org/apache/camel/component/nats/nats.json      |  1 +
 .../camel-nats/src/main/docs/nats-component.adoc   |  3 +-
 .../camel/component/nats/NatsConfiguration.java    | 13 ++++
 .../apache/camel/component/nats/NatsConstants.java |  1 +
 .../apache/camel/component/nats/NatsProducer.java  | 89 +++++++++++++++++++---
 .../component/nats/NatsProducerReplyToTest.java    | 60 +++++++++++++++
 .../nats/NatsProducerReplyToTimeoutTest.java       | 59 ++++++++++++++
 .../endpoint/dsl/NatsEndpointBuilderFactory.java   | 30 ++++++++
 .../modules/ROOT/pages/nats-component.adoc         |  3 +-
 12 files changed, 258 insertions(+), 13 deletions(-)

diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/nats-component.adoc b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/nats-component.adoc
index 582ff638..42cc819 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/nats-component.adoc
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/nats-component.adoc
@@ -79,7 +79,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (27 parameters):
+=== Query Parameters (28 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -108,6 +108,7 @@ with the following path and query parameters:
 | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. There are 3 enums and the value can be one of: InOnly, InOut, InOptionalOut |  | ExchangePattern
 | *lazyStartProducer* (producer) | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and [...]
 | *replySubject* (producer) | the subject to which subscribers should send response |  | String
+| *requestTimeout* (producer) | Request timeout in milliseconds | 20000 | long
 | *connection* (advanced) | Reference an already instantiated connection to Nats server |  | Connection
 | *traceConnection* (advanced) | Whether or not connection trace messages should be printed to standard out for fine grained debugging of connection issues. | false | boolean
 | *secure* (security) | Set secure option indicating TLS is required | false | boolean
diff --git a/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointConfigurer.java b/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointConfigurer.java
index c4c4b40..aafc444 100644
--- a/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointConfigurer.java
+++ b/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointConfigurer.java
@@ -62,6 +62,8 @@ public class NatsEndpointConfigurer extends PropertyConfigurerSupport implements
         case "replyToDisabled": target.getConfiguration().setReplyToDisabled(property(camelContext, boolean.class, value)); return true;
         case "requestcleanupinterval":
         case "requestCleanupInterval": target.getConfiguration().setRequestCleanupInterval(property(camelContext, int.class, value)); return true;
+        case "requesttimeout":
+        case "requestTimeout": target.getConfiguration().setRequestTimeout(property(camelContext, long.class, value)); return true;
         case "secure": target.getConfiguration().setSecure(property(camelContext, boolean.class, value)); return true;
         case "servers": target.getConfiguration().setServers(property(camelContext, java.lang.String.class, value)); return true;
         case "sslcontextparameters":
@@ -117,6 +119,8 @@ public class NatsEndpointConfigurer extends PropertyConfigurerSupport implements
         case "replyToDisabled": return boolean.class;
         case "requestcleanupinterval":
         case "requestCleanupInterval": return int.class;
+        case "requesttimeout":
+        case "requestTimeout": return long.class;
         case "secure": return boolean.class;
         case "servers": return java.lang.String.class;
         case "sslcontextparameters":
@@ -173,6 +177,8 @@ public class NatsEndpointConfigurer extends PropertyConfigurerSupport implements
         case "replyToDisabled": return target.getConfiguration().isReplyToDisabled();
         case "requestcleanupinterval":
         case "requestCleanupInterval": return target.getConfiguration().getRequestCleanupInterval();
+        case "requesttimeout":
+        case "requestTimeout": return target.getConfiguration().getRequestTimeout();
         case "secure": return target.getConfiguration().isSecure();
         case "servers": return target.getConfiguration().getServers();
         case "sslcontextparameters":
diff --git a/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointUriFactory.java b/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointUriFactory.java
index 192edff..780aa1f 100644
--- a/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointUriFactory.java
+++ b/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointUriFactory.java
@@ -20,7 +20,7 @@ public class NatsEndpointUriFactory extends org.apache.camel.support.component.E
     private static final Set<String> PROPERTY_NAMES;
     private static final Set<String> SECRET_PROPERTY_NAMES;
     static {
-        Set<String> props = new HashSet<>(28);
+        Set<String> props = new HashSet<>(29);
         props.add("replySubject");
         props.add("maxMessages");
         props.add("sslContextParameters");
@@ -34,6 +34,7 @@ public class NatsEndpointUriFactory extends org.apache.camel.support.component.E
         props.add("traceConnection");
         props.add("connectionTimeout");
         props.add("reconnectTimeWait");
+        props.add("requestTimeout");
         props.add("pingInterval");
         props.add("noRandomizeServers");
         props.add("poolSize");
diff --git a/components/camel-nats/src/generated/resources/org/apache/camel/component/nats/nats.json b/components/camel-nats/src/generated/resources/org/apache/camel/component/nats/nats.json
index ce031e4..97f0e96 100644
--- a/components/camel-nats/src/generated/resources/org/apache/camel/component/nats/nats.json
+++ b/components/camel-nats/src/generated/resources/org/apache/camel/component/nats/nats.json
@@ -54,6 +54,7 @@
     "exchangePattern": { "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut", "InOptionalOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." },
     "lazyStartProducer": { "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during sta [...]
     "replySubject": { "kind": "parameter", "displayName": "Reply Subject", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "the subject to which subscribers should send response" },
+    "requestTimeout": { "kind": "parameter", "displayName": "Request Timeout", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 20000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Request timeout in milliseconds" },
     "connection": { "kind": "parameter", "displayName": "Connection", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "io.nats.client.Connection", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Reference an already instantiated connection to Nats server" },
     "traceConnection": { "kind": "parameter", "displayName": "Trace Connection", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Whether or not connection trace messages should be printed to standard out for fine grained debug [...]
     "secure": { "kind": "parameter", "displayName": "Secure", "group": "security", "label": "security", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Set secure option indicating TLS is required" },
diff --git a/components/camel-nats/src/main/docs/nats-component.adoc b/components/camel-nats/src/main/docs/nats-component.adoc
index 582ff638..42cc819 100644
--- a/components/camel-nats/src/main/docs/nats-component.adoc
+++ b/components/camel-nats/src/main/docs/nats-component.adoc
@@ -79,7 +79,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (27 parameters):
+=== Query Parameters (28 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -108,6 +108,7 @@ with the following path and query parameters:
 | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. There are 3 enums and the value can be one of: InOnly, InOut, InOptionalOut |  | ExchangePattern
 | *lazyStartProducer* (producer) | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and [...]
 | *replySubject* (producer) | the subject to which subscribers should send response |  | String
+| *requestTimeout* (producer) | Request timeout in milliseconds | 20000 | long
 | *connection* (advanced) | Reference an already instantiated connection to Nats server |  | Connection
 | *traceConnection* (advanced) | Whether or not connection trace messages should be printed to standard out for fine grained debugging of connection issues. | false | boolean
 | *secure* (security) | Set secure option indicating TLS is required | false | boolean
diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
index 2e6553a..6eea512 100644
--- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
+++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
@@ -57,6 +57,8 @@ public class NatsConfiguration {
     private int maxPingsOut = Options.DEFAULT_MAX_PINGS_OUT;
     @UriParam(label = "common", defaultValue = "5000")
     private int requestCleanupInterval = 5000;
+    @UriParam(label = "producer", defaultValue = "20000")
+    private long requestTimeout = 20000L;
     @UriParam(label = "producer")
     private String replySubject;
     @UriParam
@@ -193,6 +195,17 @@ public class NatsConfiguration {
     }
 
     /**
+     * Request timeout in milliseconds
+     */
+    public void setRequestTimeout(long requestTimeout) {
+        this.requestTimeout = requestTimeout;
+    }
+
+    public long getRequestTimeout() {
+        return this.requestTimeout;
+    }
+
+    /**
      * Ping interval to be aware if connection is still alive (in milliseconds)
      */
     public int getPingInterval() {
diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConstants.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConstants.java
index 9b2b422..14511b0 100644
--- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConstants.java
+++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConstants.java
@@ -23,4 +23,5 @@ public interface NatsConstants {
     String NATS_REPLY_TO = "CamelNatsReplyTo";
     String NATS_SUBJECT = "CamelNatsSubject";
     String NATS_QUEUE_NAME = "CamelNatsQueueName";
+    String NATS_REQUEST_TIMEOUT_THREAD_PROFILE_NAME = "CamelNatsRequestTimeoutExecutor";
 }
diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
index 3fdcc5a..be08269 100644
--- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
+++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
@@ -17,23 +17,37 @@
 package org.apache.camel.component.nats;
 
 import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import io.nats.client.Connection;
 import io.nats.client.Connection.Status;
+import io.nats.client.Message;
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
-import org.apache.camel.support.DefaultProducer;
+import org.apache.camel.ExchangeTimedOutException;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.spi.ExecutorServiceManager;
+import org.apache.camel.spi.ThreadPoolProfile;
+import org.apache.camel.support.DefaultAsyncProducer;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class NatsProducer extends DefaultProducer {
+public class NatsProducer extends DefaultAsyncProducer {
 
     private static final Logger LOG = LoggerFactory.getLogger(NatsProducer.class);
 
+    private final ExecutorServiceManager executorServiceManager;
+
+    private ScheduledExecutorService scheduler;
+
     private Connection connection;
 
     public NatsProducer(NatsEndpoint endpoint) {
         super(endpoint);
+        this.executorServiceManager = endpoint.getCamelContext().getExecutorServiceManager();
     }
 
     @Override
@@ -42,26 +56,80 @@ public class NatsProducer extends DefaultProducer {
     }
 
     @Override
-    public void process(Exchange exchange) throws Exception {
+    public boolean process(Exchange exchange, AsyncCallback callback) {
         NatsConfiguration config = getEndpoint().getConfiguration();
         byte[] body = exchange.getIn().getBody(byte[].class);
         if (body == null) {
             // fallback to use string
-            body = exchange.getIn().getMandatoryBody(String.class).getBytes();
+            try {
+                body = exchange.getIn().getMandatoryBody(String.class).getBytes();
+            } catch (InvalidPayloadException e) {
+                exchange.setException(e);
+                callback.done(true);
+                return true;
+            }
         }
 
-        LOG.debug("Publishing to topic: {}", config.getTopic());
+        if (exchange.getPattern().isOutCapable()) {
+            LOG.debug("Requesting to topic: {}", config.getTopic());
 
-        if (ObjectHelper.isNotEmpty(config.getReplySubject())) {
-            String replySubject = config.getReplySubject();
-            connection.publish(config.getTopic(), replySubject, body);
+            CompletableFuture<Message> requestFuture = connection.request(config.getTopic(), body);
+            CompletableFuture timeoutFuture = this.failAfter(exchange, Duration.ofMillis(config.getRequestTimeout()));
+            CompletableFuture.anyOf(requestFuture, timeoutFuture).whenComplete((message, e) -> {
+                if (e == null) {
+                    Message msg = (Message) message;
+                    exchange.getMessage().setBody(msg.getData());
+                    exchange.getMessage().setHeader(NatsConstants.NATS_REPLY_TO, msg.getReplyTo());
+                    exchange.getMessage().setHeader(NatsConstants.NATS_SID, msg.getSID());
+                    exchange.getMessage().setHeader(NatsConstants.NATS_SUBJECT, msg.getSubject());
+                    exchange.getMessage().setHeader(NatsConstants.NATS_QUEUE_NAME, msg.getSubscription().getQueueName());
+                    exchange.getMessage().setHeader(NatsConstants.NATS_MESSAGE_TIMESTAMP, System.currentTimeMillis());
+                } else {
+                    exchange.setException(e.getCause());
+                }
+                callback.done(false);
+                if (!requestFuture.isDone()) {
+                    requestFuture.cancel(true);
+                }
+                if (!timeoutFuture.isDone()) {
+                    timeoutFuture.cancel(true);
+                }
+            });
+            return false;
         } else {
-            connection.publish(config.getTopic(), body);
+            LOG.debug("Publishing to topic: {}", config.getTopic());
+
+            if (ObjectHelper.isNotEmpty(config.getReplySubject())) {
+                String replySubject = config.getReplySubject();
+                connection.publish(config.getTopic(), replySubject, body);
+            } else {
+                connection.publish(config.getTopic(), body);
+            }
+            callback.done(true);
+            return true;
         }
     }
 
+    private <T> CompletableFuture<T> failAfter(Exchange exchange, Duration duration) {
+        final CompletableFuture<T> future = new CompletableFuture<>();
+        scheduler.schedule(() -> {
+            final ExchangeTimedOutException ex = new ExchangeTimedOutException(exchange, duration.toMillis());
+            return future.completeExceptionally(ex);
+        }, duration.toNanos(), TimeUnit.NANOSECONDS);
+        return future;
+    }
+
     @Override
     protected void doStart() throws Exception {
+        // try to lookup a pool first based on profile
+        ThreadPoolProfile profile
+                = this.executorServiceManager.getThreadPoolProfile(NatsConstants.NATS_REQUEST_TIMEOUT_THREAD_PROFILE_NAME);
+        if (profile == null) {
+            profile = this.executorServiceManager.getDefaultThreadPoolProfile();
+        }
+        this.scheduler
+                = this.executorServiceManager.newScheduledThreadPool(this,
+                        NatsConstants.NATS_REQUEST_TIMEOUT_THREAD_PROFILE_NAME, profile);
         super.doStart();
         LOG.debug("Starting Nats Producer");
 
@@ -72,6 +140,9 @@ public class NatsProducer extends DefaultProducer {
 
     @Override
     protected void doStop() throws Exception {
+        if (this.scheduler != null) {
+            this.executorServiceManager.shutdownNow(this.scheduler);
+        }
         LOG.debug("Stopping Nats Producer");
         if (ObjectHelper.isEmpty(getEndpoint().getConfiguration().getConnection())) {
             LOG.debug("Closing Nats Connection");
diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsProducerReplyToTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsProducerReplyToTest.java
new file mode 100644
index 0000000..27db16f
--- /dev/null
+++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsProducerReplyToTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.nats;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class NatsProducerReplyToTest extends NatsTestSupport {
+
+    protected String startUri = "direct:start";
+    protected String middleUri = "nats:foo.middle";
+    protected String resultUri = "mock:result";
+
+    protected MockEndpoint resultEndpoint;
+    protected String body1 = "Camel";
+    protected String body2 = "World";
+
+    @Test
+    public void testReplyTo() throws Exception {
+        resultEndpoint = context.getEndpoint(resultUri, MockEndpoint.class);
+        resultEndpoint.expectedBodiesReceivedInAnyOrder("Bye Camel", "Bye World");
+
+        String out = template.requestBody(startUri, body1, String.class);
+        String out2 = template.requestBody(startUri, body2, String.class);
+
+        resultEndpoint.assertIsSatisfied();
+
+        assertEquals("Bye Camel", out);
+        assertEquals("Bye World", out2);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from(startUri).to(middleUri).to(resultUri);
+
+                from(middleUri)
+                        .transform(simple("Bye ${body}"));
+            }
+        };
+    }
+}
diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsProducerReplyToTimeoutTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsProducerReplyToTimeoutTest.java
new file mode 100644
index 0000000..73640ad
--- /dev/null
+++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsProducerReplyToTimeoutTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.nats;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ExchangeTimedOutException;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.camel.test.junit5.TestSupport.assertIsInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class NatsProducerReplyToTimeoutTest extends NatsTestSupport {
+
+    protected String startUri = "direct:start";
+    protected String middleUri = "nats:foo.middle?requestTimeout=50";
+    protected String resultUri = "mock:result";
+
+    protected String body1 = "Camel";
+
+    @Test
+    public void testReplyToTimeout() {
+        try {
+            template.requestBody(startUri, body1, String.class);
+            fail("Should have thrown an exception");
+        } catch (CamelExecutionException e) {
+            ExchangeTimedOutException cause = assertIsInstanceOf(ExchangeTimedOutException.class, e.getCause());
+            assertEquals(50, cause.getTimeout());
+        }
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from(startUri).to(middleUri).to(resultUri);
+
+                from(middleUri)
+                        .delayer(5000)
+                        .transform(simple("Bye ${body}"));
+            }
+        };
+    }
+}
diff --git a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/NatsEndpointBuilderFactory.java b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/NatsEndpointBuilderFactory.java
index 9077f51..4775673 100644
--- a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/NatsEndpointBuilderFactory.java
+++ b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/NatsEndpointBuilderFactory.java
@@ -1332,6 +1332,36 @@ public interface NatsEndpointBuilderFactory {
             return this;
         }
         /**
+         * Request timeout in milliseconds.
+         * 
+         * The option is a: &lt;code&gt;long&lt;/code&gt; type.
+         * 
+         * Default: 20000
+         * Group: producer
+         * 
+         * @param requestTimeout the value to set
+         * @return the dsl builder
+         */
+        default NatsEndpointProducerBuilder requestTimeout(long requestTimeout) {
+            doSetProperty("requestTimeout", requestTimeout);
+            return this;
+        }
+        /**
+         * Request timeout in milliseconds.
+         * 
+         * The option will be converted to a &lt;code&gt;long&lt;/code&gt; type.
+         * 
+         * Default: 20000
+         * Group: producer
+         * 
+         * @param requestTimeout the value to set
+         * @return the dsl builder
+         */
+        default NatsEndpointProducerBuilder requestTimeout(String requestTimeout) {
+            doSetProperty("requestTimeout", requestTimeout);
+            return this;
+        }
+        /**
          * Set secure option indicating TLS is required.
          * 
          * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
diff --git a/docs/components/modules/ROOT/pages/nats-component.adoc b/docs/components/modules/ROOT/pages/nats-component.adoc
index 1b96efa..d10144d 100644
--- a/docs/components/modules/ROOT/pages/nats-component.adoc
+++ b/docs/components/modules/ROOT/pages/nats-component.adoc
@@ -81,7 +81,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (27 parameters):
+=== Query Parameters (28 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -110,6 +110,7 @@ with the following path and query parameters:
 | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. There are 3 enums and the value can be one of: InOnly, InOut, InOptionalOut |  | ExchangePattern
 | *lazyStartProducer* (producer) | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and [...]
 | *replySubject* (producer) | the subject to which subscribers should send response |  | String
+| *requestTimeout* (producer) | Request timeout in milliseconds | 20000 | long
 | *connection* (advanced) | Reference an already instantiated connection to Nats server |  | Connection
 | *traceConnection* (advanced) | Whether or not connection trace messages should be printed to standard out for fine grained debugging of connection issues. | false | boolean
 | *secure* (security) | Set secure option indicating TLS is required | false | boolean