You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2024/01/31 09:07:20 UTC

(camel) 04/16: CAMEL-19749: variables - Should also copy message headers into variable when using EIP variables

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch var-headers
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 0731a28208698444e717edf72414fbaa5faee63a
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Jan 29 10:59:38 2024 +0100

    CAMEL-19749: variables - Should also copy message headers into variable when using EIP variables
---
 .../java/org/apache/camel/processor/Enricher.java  | 19 ++++-
 .../org/apache/camel/processor/PollEnricher.java   | 17 +++-
 .../camel/processor/EnrichVariableHeadersTest.java | 94 ++++++++++++++++++++++
 .../processor/PollEnrichVariableHeadersTest.java   | 62 ++++++++++++++
 .../org/apache/camel/support/ExchangeHelper.java   |  6 +-
 5 files changed, 187 insertions(+), 11 deletions(-)

diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java
index bb8350971ec..9d641b01a05 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.processor;
 
+import java.util.Map;
+
 import org.apache.camel.AggregationStrategy;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
@@ -26,6 +28,7 @@ import org.apache.camel.ExchangePattern;
 import org.apache.camel.ExchangePropertyKey;
 import org.apache.camel.Expression;
 import org.apache.camel.spi.EndpointUtilizationStatistics;
+import org.apache.camel.spi.HeadersMapFactory;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.spi.ProcessorExchangeFactory;
 import org.apache.camel.spi.RouteIdAware;
@@ -63,6 +66,7 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA
     private boolean ignoreInvalidEndpoint;
     private boolean allowOptimisedComponents = true;
     private boolean autoStartupComponents = true;
+    private HeadersMapFactory headersMapFactory;
     private ProcessorExchangeFactory processorExchangeFactory;
     private SendDynamicProcessor sendDynamicProcessor;
 
@@ -188,9 +192,12 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA
         // if we should store the received message body in a variable,
         // then we need to preserve the original message body
         Object body = null;
+        Map<String, Object> headers = null;
         if (variableReceive != null) {
             try {
                 body = exchange.getMessage().getBody();
+                // do a defensive copy of the headers
+                headers = headersMapFactory.newMap(exchange.getMessage().getHeaders());
             } catch (Exception throwable) {
                 exchange.setException(throwable);
                 callback.done(true);
@@ -198,6 +205,7 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA
             }
         }
         final Object originalBody = body;
+        final Map<String, Object> originalHeaders = headers;
 
         return sendDynamicProcessor.process(resourceExchange, new AsyncCallback() {
             @Override
@@ -216,9 +224,9 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA
                         if (aggregatedExchange != null) {
                             if (variableReceive != null) {
                                 // result should be stored in variable instead of message body
-                                Object value = aggregatedExchange.getMessage().getBody();
-                                ExchangeHelper.setVariable(exchange, variableReceive, value);
-                                aggregatedExchange.getMessage().setBody(originalBody);
+                                ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive);
+                                exchange.getMessage().setBody(originalBody);
+                                exchange.getMessage().setHeaders(originalHeaders);
                             }
                             // copy aggregation result onto original exchange (preserving pattern)
                             copyResultsWithoutCorrelationId(exchange, aggregatedExchange);
@@ -302,6 +310,11 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA
         ServiceHelper.buildService(processorExchangeFactory, sendDynamicProcessor);
     }
 
+    @Override
+    protected void doInit() throws Exception {
+        headersMapFactory = camelContext.getCamelContextExtension().getHeadersMapFactory();
+    }
+
     @Override
     protected void doStart() throws Exception {
         ServiceHelper.startService(processorExchangeFactory, aggregationStrategy, sendDynamicProcessor);
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java
index 21759c73f24..4005d83834c 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.processor;
 
+import java.util.Map;
+
 import org.apache.camel.AggregationStrategy;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
@@ -32,6 +34,7 @@ import org.apache.camel.PollingConsumer;
 import org.apache.camel.spi.ConsumerCache;
 import org.apache.camel.spi.EndpointUtilizationStatistics;
 import org.apache.camel.spi.ExceptionHandler;
+import org.apache.camel.spi.HeadersMapFactory;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.spi.NormalizedEndpointUri;
 import org.apache.camel.spi.RouteIdAware;
@@ -64,7 +67,8 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout
 
     private CamelContext camelContext;
     private ConsumerCache consumerCache;
-    protected volatile String scheme;
+    private HeadersMapFactory headersMapFactory;
+    private volatile String scheme;
     private String id;
     private String routeId;
     private String variableReceive;
@@ -320,9 +324,12 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout
         // if we should store the received message body in a variable,
         // then we need to preserve the original message body
         Object originalBody = null;
+        Map<String, Object> originalHeaders = null;
         if (variableReceive != null) {
             try {
                 originalBody = exchange.getMessage().getBody();
+                // do a defensive copy of the headers
+                originalHeaders = headersMapFactory.newMap(exchange.getMessage().getHeaders());
             } catch (Exception throwable) {
                 exchange.setException(throwable);
                 callback.done(true);
@@ -345,9 +352,9 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout
                 if (aggregatedExchange != null) {
                     if (variableReceive != null) {
                         // result should be stored in variable instead of message body
-                        Object value = aggregatedExchange.getMessage().getBody();
-                        ExchangeHelper.setVariable(exchange, variableReceive, value);
-                        aggregatedExchange.getMessage().setBody(originalBody);
+                        ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive);
+                        exchange.getMessage().setBody(originalBody);
+                        exchange.getMessage().setHeaders(originalHeaders);
                     }
                     // copy aggregation result onto original exchange (preserving pattern)
                     copyResultsPreservePattern(exchange, aggregatedExchange);
@@ -485,6 +492,8 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout
             scheme = ExchangeHelper.resolveScheme(u);
         }
 
+        headersMapFactory = camelContext.getCamelContextExtension().getHeadersMapFactory();
+
         ServiceHelper.initService(consumerCache, aggregationStrategy);
     }
 
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/EnrichVariableHeadersTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/EnrichVariableHeadersTest.java
new file mode 100644
index 00000000000..229e5c86d4e
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/EnrichVariableHeadersTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.jupiter.api.Test;
+
+public class EnrichVariableHeadersTest extends ContextTestSupport {
+
+    @Test
+    public void testSend() throws Exception {
+        getMockEndpoint("mock:before").expectedBodiesReceived("World");
+        getMockEndpoint("mock:before").expectedVariableReceived("hello", "Camel");
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");
+        getMockEndpoint("mock:result").expectedVariableReceived("hello", "Camel");
+        getMockEndpoint("mock:result").message(0).header("echo").isEqualTo("CamelCamel");
+
+        template.sendBody("direct:send", "World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testReceive() throws Exception {
+        getMockEndpoint("mock:after").expectedBodiesReceived("World");
+        getMockEndpoint("mock:after").expectedVariableReceived("bye", "Bye World");
+        getMockEndpoint("mock:after").message(0).header("echo").isNull();
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+        getMockEndpoint("mock:result").expectedVariableReceived("bye", "Bye World");
+        getMockEndpoint("mock:result").message(0).header("echo").isNull();
+
+        template.sendBody("direct:receive", "World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testSendAndReceive() throws Exception {
+        getMockEndpoint("mock:before").expectedBodiesReceived("World");
+        getMockEndpoint("mock:before").expectedVariableReceived("hello", "Camel");
+        getMockEndpoint("mock:result").expectedBodiesReceived("World");
+        getMockEndpoint("mock:result").expectedVariableReceived("bye", "Bye Camel");
+        getMockEndpoint("mock:result").message(0).header("echo").isNull();
+
+        template.sendBody("direct:sendAndReceive", "World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:send")
+                        .setVariable("hello", simple("Camel"))
+                        .to("mock:before")
+                        .enrich().constant("direct:foo").variableSend("hello")
+                        .to("mock:result");
+
+                from("direct:receive")
+                        .enrich().constant("direct:foo").variableReceive("bye")
+                        .to("mock:after")
+                        .setBody(simple("${variable:bye}"))
+                        .to("mock:result");
+
+                from("direct:sendAndReceive")
+                        .setVariable("hello", simple("Camel"))
+                        .to("mock:before")
+                        .enrich().constant("direct:foo").variableSend("hello").variableReceive("bye")
+                        .to("mock:result");
+
+                from("direct:foo")
+                        .setHeader("echo", simple("${body}${body}"))
+                        .transform().simple("Bye ${body}");
+            }
+        };
+    }
+}
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/PollEnrichVariableHeadersTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/PollEnrichVariableHeadersTest.java
new file mode 100644
index 00000000000..852e3b0cd13
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/PollEnrichVariableHeadersTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.Map;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class PollEnrichVariableHeadersTest extends ContextTestSupport {
+
+    @Test
+    public void testReceive() throws Exception {
+        template.sendBodyAndHeader("seda:foo", "Bye World", "echo", "CamelCamel");
+
+        getMockEndpoint("mock:after").expectedBodiesReceived("World");
+        getMockEndpoint("mock:after").expectedVariableReceived("bye", "Bye World");
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+        getMockEndpoint("mock:result").expectedVariableReceived("bye", "Bye World");
+        getMockEndpoint("mock:result").message(0).header("echo").isNull();
+        getMockEndpoint("mock:result").whenAnyExchangeReceived(e -> {
+            Map m = e.getVariable("bye.headers", Map.class);
+            Assertions.assertNotNull(m);
+            Assertions.assertEquals(1, m.size());
+            Assertions.assertEquals("CamelCamel", m.get("echo"));
+        });
+
+        template.sendBody("direct:receive", "World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:receive")
+                        .pollEnrich().constant("seda:foo").timeout(1000).variableReceive("bye")
+                        .to("mock:after")
+                        .setBody(simple("${variable:bye}"))
+                        .to("mock:result");
+            }
+        };
+    }
+}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
index 85c34ecf86a..02df6ccce7b 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
@@ -1094,8 +1094,7 @@ public final class ExchangeHelper {
                 name = StringHelper.after(name, ":");
                 repo.setVariable(name, value);
             } else {
-                exchange.setException(
-                        new IllegalArgumentException("VariableRepository with id: " + id + " does not exist"));
+                throw new IllegalArgumentException("VariableRepository with id: " + id + " does not exist");
             }
         } else {
             exchange.setVariable(name, value);
@@ -1110,8 +1109,7 @@ public final class ExchangeHelper {
                     = exchange.getContext().getCamelContextExtension().getContextPlugin(VariableRepositoryFactory.class);
             repo = factory.getVariableRepository(id);
             if (repo == null) {
-                exchange.setException(
-                        new IllegalArgumentException("VariableRepository with id: " + id + " does not exist"));
+                throw new IllegalArgumentException("VariableRepository with id: " + id + " does not exist");
             }
             name = StringHelper.after(name, ":");
         }