You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/12/04 07:29:55 UTC

[camel] 01/02: CAMEL-14252: camel-nats - Add support for reply-to in consumer

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 835c7c93fa62c941ff36946dfaeb9558dddb28c1
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Dec 4 08:23:37 2019 +0100

    CAMEL-14252: camel-nats - Add support for reply-to in consumer
---
 .../camel-nats/src/main/docs/nats-component.adoc   | 20 ++++++++++++++----
 .../camel/component/nats/NatsConfiguration.java    | 13 ++++++++++++
 .../apache/camel/component/nats/NatsConsumer.java  | 12 +++++++++++
 ...sumerTest.java => NatsConsumerReplyToTest.java} | 21 +++++++++++++------
 .../camel/component/nats/NatsConsumerTest.java     |  2 +-
 .../endpoint/dsl/NatsEndpointBuilderFactory.java   | 24 ++++++++++++++++++++++
 6 files changed, 81 insertions(+), 11 deletions(-)

diff --git a/components/camel-nats/src/main/docs/nats-component.adoc b/components/camel-nats/src/main/docs/nats-component.adoc
index 7a2f06b..ff05266 100644
--- a/components/camel-nats/src/main/docs/nats-component.adoc
+++ b/components/camel-nats/src/main/docs/nats-component.adoc
@@ -72,7 +72,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (27 parameters):
+=== Query Parameters (28 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -97,6 +97,7 @@ with the following path and query parameters:
 | *maxMessages* (consumer) | Stop receiving messages from a topic we are subscribing to after maxMessages |  | String
 | *poolSize* (consumer) | Consumer pool size | 10 | int
 | *queueName* (consumer) | The Queue name if we are using nats for a queue configuration |  | String
+| *replyToDisabled* (consumer) | Can be used to turn off sending back reply message in the consumer. | false | boolean
 | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. |  | ExchangePattern
 | *lazyStartProducer* (producer) | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and [...]
@@ -147,10 +148,21 @@ The component supports 5 options, which are listed below.
 [width="100%",options="header"]
 |=======================================================================
 |Name |Type |Description
-
-|CamelNatsMessageTimestamp |long |The timestamp of a consumed message.
+| CamelNatsSID | String | The SID of a consumed message.
+| CamelNatsReplyTo | String | The ReplyTo of a consumed message (may be null).
+| CamelNatsSubject | String | The Subject of a consumed message.
+| CamelNatsQueueName | String | The Queue name of a consumed message (may be null).
+| CamelNatsMessageTimestamp | long | The timestamp of a consumed message.
 |=======================================================================
- 
+
+== Request/Reply support
+The producer only supports publishing (sending) messages.
+The producer does not support request/reply where it can wait for an expected reply message.
+
+The consumer will when routing the message is complete, send back the message as reply-message if required.
+
+== Examples
+
 *Producer example:*
 
 [source,java]
diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
index d046a8f..e053b96 100644
--- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
+++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
@@ -66,6 +66,8 @@ public class NatsConfiguration {
     @UriParam(label = "consumer")
     private String queueName;
     @UriParam(label = "consumer")
+    private boolean replyToDisabled;
+    @UriParam(label = "consumer")
     private String maxMessages;
     @UriParam(label = "consumer", defaultValue = "10")
     private int poolSize = 10;
@@ -258,6 +260,17 @@ public class NatsConfiguration {
         this.queueName = queueName;
     }
 
+    public boolean isReplyToDisabled() {
+        return replyToDisabled;
+    }
+
+    /**
+     * Can be used to turn off sending back reply message in the consumer.
+     */
+    public void setReplyToDisabled(boolean replyToDisabled) {
+        this.replyToDisabled = replyToDisabled;
+    }
+
     /**
      * Stop receiving messages from a topic we are subscribing to after
      * maxMessages
diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
index bbcd715..5c905aa 100644
--- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
+++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
@@ -155,6 +155,18 @@ public class NatsConsumer extends DefaultConsumer {
                 } catch (Exception e) {
                     getExceptionHandler().handleException("Error during processing", exchange, e);
                 }
+
+                // is there a reply?
+                if (!configuration.isReplyToDisabled()
+                        && msg.getReplyTo() != null && msg.getConnection() != null) {
+                    Connection con = msg.getConnection();
+                    byte[] data = exchange.getMessage().getBody(byte[].class);
+                    if (data != null) {
+                        log.debug("Publishing replyTo: {} message", msg.getReplyTo());
+                        con.publish(msg.getReplyTo(), data);
+                    }
+                }
+
             }
         }
     }
diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerReplyToTest.java
similarity index 66%
copy from components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java
copy to components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerReplyToTest.java
index 6267648..33a5d5b 100644
--- a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java
+++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerReplyToTest.java
@@ -21,19 +21,23 @@ import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.junit.Test;
 
-public class NatsConsumerTest extends NatsTestSupport {
+public class NatsConsumerReplyToTest extends NatsTestSupport {
 
     @EndpointInject("mock:result")
     protected MockEndpoint mockResultEndpoint;
 
     @Test
-    public void testConsumer() throws Exception {
-        mockResultEndpoint.expectedBodiesReceived("Hello World");
+    public void testReplyTo() throws Exception {
+        mockResultEndpoint.expectedBodiesReceived("World");
         mockResultEndpoint.expectedHeaderReceived(NatsConstants.NATS_SUBJECT, "test");
 
-        template.requestBody("direct:send", "Hello World");
+        template.sendBody("direct:send", "World");
 
         mockResultEndpoint.assertIsSatisfied();
+
+        // grab reply message from the reply queue
+        String out = consumer.receiveBody("nats://"  + getNatsUrl() + "?topic=myReplyQueue", 5000, String.class);
+        assertEquals("Bye World", out);
     }
 
     @Override
@@ -41,8 +45,13 @@ public class NatsConsumerTest extends NatsTestSupport {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("direct:send").to("nats://"  + getNatsUrl() + "?topic=test&flushConnection=true");
-                from("nats://" + getNatsUrl() + "?topic=test&flushConnection=true").to(mockResultEndpoint);
+                from("direct:send")
+                        .to("nats://"  + getNatsUrl() + "?topic=test&replySubject=myReplyQueue&flushConnection=true");
+
+                from("nats://" + getNatsUrl() + "?topic=test&flushConnection=true")
+                        .to(mockResultEndpoint)
+                        .convertBodyTo(String.class)
+                        .setBody().simple("Bye ${body}");
             }
         };
     }
diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java
index 6267648..85676a4 100644
--- a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java
+++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java
@@ -31,7 +31,7 @@ public class NatsConsumerTest extends NatsTestSupport {
         mockResultEndpoint.expectedBodiesReceived("Hello World");
         mockResultEndpoint.expectedHeaderReceived(NatsConstants.NATS_SUBJECT, "test");
 
-        template.requestBody("direct:send", "Hello World");
+        template.sendBody("direct:send", "Hello World");
 
         mockResultEndpoint.assertIsSatisfied();
     }
diff --git a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/NatsEndpointBuilderFactory.java b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/NatsEndpointBuilderFactory.java
index 9e27ffc..b47029a 100644
--- a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/NatsEndpointBuilderFactory.java
+++ b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/NatsEndpointBuilderFactory.java
@@ -468,6 +468,30 @@ public interface NatsEndpointBuilderFactory {
             return this;
         }
         /**
+         * Can be used to turn off sending back reply message in the consumer.
+         * 
+         * The option is a: <code>boolean</code> type.
+         * 
+         * Group: consumer
+         */
+        default NatsEndpointConsumerBuilder replyToDisabled(
+                boolean replyToDisabled) {
+            doSetProperty("replyToDisabled", replyToDisabled);
+            return this;
+        }
+        /**
+         * Can be used to turn off sending back reply message in the consumer.
+         * 
+         * The option will be converted to a <code>boolean</code> type.
+         * 
+         * Group: consumer
+         */
+        default NatsEndpointConsumerBuilder replyToDisabled(
+                String replyToDisabled) {
+            doSetProperty("replyToDisabled", replyToDisabled);
+            return this;
+        }
+        /**
          * Set secure option indicating TLS is required.
          * 
          * The option is a: <code>boolean</code> type.