You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/12/04 07:06:51 UTC

[camel] 01/02: CAMEL-14251: camel-nats - Store message payload in body/headers

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 15bacd5fa5c2c327f5166ef2c48e8d7a0c4c6615
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Dec 4 07:59:08 2019 +0100

    CAMEL-14251: camel-nats - Store message payload in body/headers
---
 .../java/org/apache/camel/component/nats/NatsConstants.java    |  4 ++++
 .../java/org/apache/camel/component/nats/NatsConsumer.java     |  6 +++++-
 .../java/org/apache/camel/component/nats/NatsConsumerTest.java | 10 +++++-----
 .../camel/component/nats/NatsConsumerWithRedeliveryTest.java   |  4 +---
 4 files changed, 15 insertions(+), 9 deletions(-)

diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConstants.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConstants.java
index a927360..9b2b422 100644
--- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConstants.java
+++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConstants.java
@@ -19,4 +19,8 @@ package org.apache.camel.component.nats;
 public interface NatsConstants {
 
     String NATS_MESSAGE_TIMESTAMP = "CamelNatsMessageTimestamp";
+    String NATS_SID = "CamelNatsSID";
+    String NATS_REPLY_TO = "CamelNatsReplyTo";
+    String NATS_SUBJECT = "CamelNatsSubject";
+    String NATS_QUEUE_NAME = "CamelNatsQueueName";
 }
diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
index 1a0611a..bbcd715 100644
--- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
+++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
@@ -144,7 +144,11 @@ public class NatsConsumer extends DefaultConsumer {
             public void onMessage(Message msg) throws InterruptedException {
                 log.debug("Received Message: {}", msg);
                 Exchange exchange = getEndpoint().createExchange();
-                exchange.getIn().setBody(msg);
+                exchange.getIn().setBody(msg.getData());
+                exchange.getIn().setHeader(NatsConstants.NATS_REPLY_TO, msg.getReplyTo());
+                exchange.getIn().setHeader(NatsConstants.NATS_SID, msg.getSID());
+                exchange.getIn().setHeader(NatsConstants.NATS_SUBJECT, msg.getSubject());
+                exchange.getIn().setHeader(NatsConstants.NATS_QUEUE_NAME, msg.getSubscription().getQueueName());
                 exchange.getIn().setHeader(NatsConstants.NATS_MESSAGE_TIMESTAMP, System.currentTimeMillis());
                 try {
                     processor.process(exchange);
diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java
index bcbfeac..6267648 100644
--- a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java
+++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java
@@ -16,8 +16,6 @@
  */
 package org.apache.camel.component.nats;
 
-import java.io.IOException;
-
 import org.apache.camel.EndpointInject;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
@@ -29,9 +27,11 @@ public class NatsConsumerTest extends NatsTestSupport {
     protected MockEndpoint mockResultEndpoint;
 
     @Test
-    public void testConsumer() throws InterruptedException, IOException {
-        mockResultEndpoint.expectedMessageCount(1);
-        template.requestBody("direct:send", "test");
+    public void testConsumer() throws Exception {
+        mockResultEndpoint.expectedBodiesReceived("Hello World");
+        mockResultEndpoint.expectedHeaderReceived(NatsConstants.NATS_SUBJECT, "test");
+
+        template.requestBody("direct:send", "Hello World");
 
         mockResultEndpoint.assertIsSatisfied();
     }
diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerWithRedeliveryTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerWithRedeliveryTest.java
index 1f965cc..afea0dc 100644
--- a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerWithRedeliveryTest.java
+++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerWithRedeliveryTest.java
@@ -18,7 +18,6 @@ package org.apache.camel.component.nats;
 
 import java.io.IOException;
 
-import io.nats.client.Message;
 import org.apache.camel.EndpointInject;
 import org.apache.camel.Exchange;
 import org.apache.camel.LoggingLevel;
@@ -66,8 +65,7 @@ public class NatsConsumerWithRedeliveryTest extends NatsTestSupport {
 
                     @Override
                     public boolean matches(Exchange exchange) {
-                        Message g = exchange.getIn().getBody(Message.class);
-                        String s = new String(g.getData());
+                        String s = exchange.getMessage().getBody(String.class);
                         if (s.contains("test")) {
                             return true;
                         }