You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2024/01/31 09:07:19 UTC
(camel) 03/16: CAMEL-19749: variables - Should also copy message headers into variable when using EIP variables
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch var-headers
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 7fe558b0de3bdfcd39e9f22ce9289cb1b266bdf5
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Jan 29 10:29:47 2024 +0100
CAMEL-19749: variables - Should also copy message headers into variable when using EIP variables
---
.../java/org/apache/camel/builder/Builder.java | 2 +-
.../org/apache/camel/builder/DataFormatClause.java | 5 +-
.../camel/builder/ExpressionClauseSupport.java | 2 +-
.../camel/builder/LanguageBuilderFactory.java | 2 +-
.../camel/model/dataformat/BeanioDataFormat.java | 11 +--
.../camel/model/language/WasmExpression.java | 2 +-
.../camel/processor/SendDynamicProcessor.java | 11 ++-
.../org/apache/camel/processor/SendProcessor.java | 7 +-
.../dataformat/BeanioDataFormatReifier.java | 4 +-
.../apache/camel/processor/FromVariableTest.java | 4 +-
.../processor/ToDynamicVariableHeadersTest.java | 93 +++++++++++++++++++++
.../camel/processor/ToVariableHeadersTest.java | 94 ++++++++++++++++++++++
.../org/apache/camel/support/ExchangeHelper.java | 3 +-
13 files changed, 220 insertions(+), 20 deletions(-)
diff --git a/core/camel-core-model/src/main/java/org/apache/camel/builder/Builder.java b/core/camel-core-model/src/main/java/org/apache/camel/builder/Builder.java
index 042812ab99c..1b0ffec14c8 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/builder/Builder.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/builder/Builder.java
@@ -28,8 +28,8 @@ import org.apache.camel.model.language.JsonPathExpression;
import org.apache.camel.model.language.LanguageExpression;
import org.apache.camel.model.language.MethodCallExpression;
import org.apache.camel.model.language.SimpleExpression;
-import org.apache.camel.model.language.WasmExpression;
import org.apache.camel.model.language.VariableExpression;
+import org.apache.camel.model.language.WasmExpression;
import org.apache.camel.util.ObjectHelper;
/**
diff --git a/core/camel-core-model/src/main/java/org/apache/camel/builder/DataFormatClause.java b/core/camel-core-model/src/main/java/org/apache/camel/builder/DataFormatClause.java
index 63f314a77b9..4855d3502ff 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/builder/DataFormatClause.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/builder/DataFormatClause.java
@@ -186,8 +186,9 @@ public class DataFormatClause<T extends ProcessorDefinition<?>> {
/**
* Uses the beanio data format
*/
- public T beanio(String mapping, String streamName, String encoding, boolean ignoreUnidentifiedRecords,
- boolean ignoreUnexpectedRecords, boolean ignoreInvalidRecords) {
+ public T beanio(
+ String mapping, String streamName, String encoding, boolean ignoreUnidentifiedRecords,
+ boolean ignoreUnexpectedRecords, boolean ignoreInvalidRecords) {
BeanioDataFormat dataFormat = new BeanioDataFormat();
dataFormat.setMapping(mapping);
dataFormat.setStreamName(streamName);
diff --git a/core/camel-core-model/src/main/java/org/apache/camel/builder/ExpressionClauseSupport.java b/core/camel-core-model/src/main/java/org/apache/camel/builder/ExpressionClauseSupport.java
index 1efd4b30997..fd3e07ad584 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/builder/ExpressionClauseSupport.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/builder/ExpressionClauseSupport.java
@@ -44,8 +44,8 @@ import org.apache.camel.model.language.RefExpression;
import org.apache.camel.model.language.SimpleExpression;
import org.apache.camel.model.language.SpELExpression;
import org.apache.camel.model.language.TokenizerExpression;
-import org.apache.camel.model.language.WasmExpression;
import org.apache.camel.model.language.VariableExpression;
+import org.apache.camel.model.language.WasmExpression;
import org.apache.camel.model.language.XMLTokenizerExpression;
import org.apache.camel.model.language.XPathExpression;
import org.apache.camel.model.language.XQueryExpression;
diff --git a/core/camel-core-model/src/main/java/org/apache/camel/builder/LanguageBuilderFactory.java b/core/camel-core-model/src/main/java/org/apache/camel/builder/LanguageBuilderFactory.java
index 4b225993b8a..d3cd60dc6cd 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/builder/LanguageBuilderFactory.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/builder/LanguageBuilderFactory.java
@@ -37,8 +37,8 @@ import org.apache.camel.model.language.RefExpression;
import org.apache.camel.model.language.SimpleExpression;
import org.apache.camel.model.language.SpELExpression;
import org.apache.camel.model.language.TokenizerExpression;
-import org.apache.camel.model.language.WasmExpression;
import org.apache.camel.model.language.VariableExpression;
+import org.apache.camel.model.language.WasmExpression;
import org.apache.camel.model.language.XMLTokenizerExpression;
import org.apache.camel.model.language.XPathExpression;
import org.apache.camel.model.language.XQueryExpression;
diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/dataformat/BeanioDataFormat.java b/core/camel-core-model/src/main/java/org/apache/camel/model/dataformat/BeanioDataFormat.java
index 744b647454b..0b175a8b50f 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/model/dataformat/BeanioDataFormat.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/model/dataformat/BeanioDataFormat.java
@@ -21,6 +21,7 @@ import jakarta.xml.bind.annotation.XmlAccessorType;
import jakarta.xml.bind.annotation.XmlAttribute;
import jakarta.xml.bind.annotation.XmlRootElement;
import jakarta.xml.bind.annotation.XmlTransient;
+
import org.apache.camel.builder.DataFormatBuilder;
import org.apache.camel.model.DataFormatDefinition;
import org.apache.camel.spi.Metadata;
@@ -183,8 +184,8 @@ public class BeanioDataFormat extends DataFormatDefinition {
private String unmarshalSingleObject;
/**
- * The BeanIO mapping file. Is by default loaded from the classpath. You can prefix with file:, http:, or classpath:
- * to denote from where to load the mapping file.
+ * The BeanIO mapping file. Is by default loaded from the classpath. You can prefix with file:, http:, or
+ * classpath: to denote from where to load the mapping file.
*/
public BeanioDataFormat.Builder mapping(String mapping) {
this.mapping = mapping;
@@ -258,8 +259,8 @@ public class BeanioDataFormat extends DataFormatDefinition {
}
/**
- * To use a custom org.apache.camel.dataformat.beanio.BeanIOErrorHandler as error handler while parsing. Configure
- * the fully qualified class name of the error handler. Notice the options ignoreUnidentifiedRecords,
+ * To use a custom org.apache.camel.dataformat.beanio.BeanIOErrorHandler as error handler while parsing.
+ * Configure the fully qualified class name of the error handler. Notice the options ignoreUnidentifiedRecords,
* ignoreUnexpectedRecords, and ignoreInvalidRecords may not be in use when you use a custom error handler.
*/
public BeanioDataFormat.Builder beanReaderErrorHandlerType(String beanReaderErrorHandlerType) {
@@ -293,4 +294,4 @@ public class BeanioDataFormat extends DataFormatDefinition {
}
}
-}
\ No newline at end of file
+}
diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/language/WasmExpression.java b/core/camel-core-model/src/main/java/org/apache/camel/model/language/WasmExpression.java
index 65c608cb5f5..f1b898477f7 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/model/language/WasmExpression.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/model/language/WasmExpression.java
@@ -21,6 +21,7 @@ import jakarta.xml.bind.annotation.XmlAccessorType;
import jakarta.xml.bind.annotation.XmlAttribute;
import jakarta.xml.bind.annotation.XmlRootElement;
import jakarta.xml.bind.annotation.XmlTransient;
+
import org.apache.camel.spi.Metadata;
/**
@@ -42,7 +43,6 @@ public class WasmExpression extends TypedExpressionDefinition {
super(expression);
}
-
public WasmExpression(String expression, String module) {
super(expression);
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java
index 19ab67b9421..76633de094d 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.processor;
+import java.util.Map;
+
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
@@ -29,6 +31,7 @@ import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.Processor;
import org.apache.camel.ResolveEndpointFailedException;
import org.apache.camel.spi.EndpointUtilizationStatistics;
+import org.apache.camel.spi.HeadersMapFactory;
import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.NormalizedEndpointUri;
import org.apache.camel.spi.ProducerCache;
@@ -44,8 +47,6 @@ import org.apache.camel.util.URISupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Map;
-
/**
* Processor for forwarding exchanges to a dynamic endpoint destination.
*
@@ -64,6 +65,7 @@ public class SendDynamicProcessor extends AsyncProcessorSupport implements IdAwa
protected String variableReceive;
protected ExchangePattern pattern;
protected ProducerCache producerCache;
+ protected HeadersMapFactory headersMapFactory;
protected String id;
protected String routeId;
protected boolean ignoreInvalidEndpoint;
@@ -183,7 +185,8 @@ public class SendDynamicProcessor extends AsyncProcessorSupport implements IdAwa
if (variableReceive != null) {
try {
body = exchange.getMessage().getBody();
- headers = exchange.getMessage().getHeaders();
+ // do a defensive copy of the headers
+ headers = headersMapFactory.newMap(exchange.getMessage().getHeaders());
} catch (Exception throwable) {
exchange.setException(throwable);
callback.done(true);
@@ -379,6 +382,8 @@ public class SendDynamicProcessor extends AsyncProcessorSupport implements IdAwa
}
}
ServiceHelper.initService(dynamicAware);
+
+ headersMapFactory = camelContext.getCamelContextExtension().getHeadersMapFactory();
}
@Override
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java
index 907482a70e0..d8acea55531 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java
@@ -28,6 +28,7 @@ import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.ExchangePropertyKey;
import org.apache.camel.Traceable;
+import org.apache.camel.spi.HeadersMapFactory;
import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.ProducerCache;
import org.apache.camel.spi.RouteIdAware;
@@ -57,6 +58,7 @@ public class SendProcessor extends AsyncProcessorSupport implements Traceable, E
protected final ExchangePattern pattern;
protected ProducerCache producerCache;
protected AsyncProducer producer;
+ protected HeadersMapFactory headersMapFactory;
protected final Endpoint destination;
protected String variableSend;
protected String variableReceive;
@@ -136,7 +138,8 @@ public class SendProcessor extends AsyncProcessorSupport implements Traceable, E
if (variableReceive != null) {
try {
body = exchange.getMessage().getBody();
- headers = exchange.getMessage().getHeaders();
+ // do a defensive copy of the headers
+ headers = headersMapFactory.newMap(exchange.getMessage().getHeaders());
} catch (Exception throwable) {
exchange.setException(throwable);
callback.done(true);
@@ -294,6 +297,8 @@ public class SendProcessor extends AsyncProcessorSupport implements Traceable, E
producerCache = new DefaultProducerCache(this, camelContext, 0);
// do not add as service as we do not want to manage the producer cache
}
+
+ headersMapFactory = camelContext.getCamelContextExtension().getHeadersMapFactory();
}
@Override
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/dataformat/BeanioDataFormatReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/dataformat/BeanioDataFormatReifier.java
index 1e49560407a..bd9708e3612 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/dataformat/BeanioDataFormatReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/dataformat/BeanioDataFormatReifier.java
@@ -16,12 +16,12 @@
*/
package org.apache.camel.reifier.dataformat;
+import java.util.Map;
+
import org.apache.camel.CamelContext;
import org.apache.camel.model.DataFormatDefinition;
import org.apache.camel.model.dataformat.BeanioDataFormat;
-import java.util.Map;
-
public class BeanioDataFormatReifier extends DataFormatReifier<BeanioDataFormat> {
public BeanioDataFormatReifier(CamelContext camelContext, DataFormatDefinition definition) {
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/FromVariableTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/FromVariableTest.java
index cc1364e31e0..e1227bb9bce 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/FromVariableTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/FromVariableTest.java
@@ -16,13 +16,13 @@
*/
package org.apache.camel.processor;
+import java.util.Map;
+
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
-import java.util.Map;
-
public class FromVariableTest extends ContextTestSupport {
@Test
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/ToDynamicVariableHeadersTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/ToDynamicVariableHeadersTest.java
new file mode 100644
index 00000000000..12e4c1cafc4
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/ToDynamicVariableHeadersTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.jupiter.api.Test;
+
+public class ToDynamicVariableHeadersTest extends ContextTestSupport {
+
+ @Test
+ public void testSend() throws Exception {
+ getMockEndpoint("mock:before").expectedBodiesReceived("World");
+ getMockEndpoint("mock:before").expectedVariableReceived("hello", "Camel");
+ getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");
+ getMockEndpoint("mock:result").expectedVariableReceived("hello", "Camel");
+ getMockEndpoint("mock:result").message(0).header("echo").isEqualTo("CamelCamel");
+
+ template.sendBodyAndHeader("direct:send", "World", "where", "foo");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Test
+ public void testReceive() throws Exception {
+ getMockEndpoint("mock:after").expectedBodiesReceived("World");
+ getMockEndpoint("mock:after").expectedVariableReceived("bye", "Bye World");
+ getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+ getMockEndpoint("mock:result").expectedVariableReceived("bye", "Bye World");
+ getMockEndpoint("mock:result").message(0).header("echo").isNull();
+
+ template.sendBodyAndHeader("direct:receive", "World", "where", "foo");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Test
+ public void testSendAndReceive() throws Exception {
+ getMockEndpoint("mock:before").expectedBodiesReceived("World");
+ getMockEndpoint("mock:before").expectedVariableReceived("hello", "Camel");
+ getMockEndpoint("mock:result").expectedBodiesReceived("World");
+ getMockEndpoint("mock:result").expectedVariableReceived("bye", "Bye Camel");
+ getMockEndpoint("mock:result").message(0).header("echo").isNull();
+
+ template.sendBodyAndHeader("direct:sendAndReceive", "World", "where", "foo");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:send")
+ .setVariable("hello", simple("Camel"))
+ .to("mock:before")
+ .toD("direct:${header.where}", "hello", null)
+ .to("mock:result");
+
+ from("direct:receive")
+ .toD("direct:${header.where}", null, "bye")
+ .to("mock:after")
+ .setBody(simple("${variable:bye}"))
+ .to("mock:result");
+
+ from("direct:sendAndReceive")
+ .setVariable("hello", simple("Camel"))
+ .to("mock:before")
+ .toD("direct:${header.where}", "hello", "bye")
+ .to("mock:result");
+
+ from("direct:foo")
+ .setHeader("echo", simple("${body}${body}"))
+ .transform().simple("Bye ${body}");
+ }
+ };
+ }
+}
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/ToVariableHeadersTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/ToVariableHeadersTest.java
new file mode 100644
index 00000000000..dbadd6732e7
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/ToVariableHeadersTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.jupiter.api.Test;
+
+public class ToVariableHeadersTest extends ContextTestSupport {
+
+ @Test
+ public void testSend() throws Exception {
+ getMockEndpoint("mock:before").expectedBodiesReceived("World");
+ getMockEndpoint("mock:before").expectedVariableReceived("hello", "Camel");
+ getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");
+ getMockEndpoint("mock:result").expectedVariableReceived("hello", "Camel");
+ getMockEndpoint("mock:result").message(0).header("echo").isEqualTo("CamelCamel");
+
+ template.sendBody("direct:send", "World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Test
+ public void testReceive() throws Exception {
+ getMockEndpoint("mock:after").expectedBodiesReceived("World");
+ getMockEndpoint("mock:after").expectedVariableReceived("bye", "Bye World");
+ getMockEndpoint("mock:after").message(0).header("echo").isNull();
+ getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+ getMockEndpoint("mock:result").expectedVariableReceived("bye", "Bye World");
+ getMockEndpoint("mock:result").message(0).header("echo").isNull();
+
+ template.sendBody("direct:receive", "World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Test
+ public void testSendAndReceive() throws Exception {
+ getMockEndpoint("mock:before").expectedBodiesReceived("World");
+ getMockEndpoint("mock:before").expectedVariableReceived("hello", "Camel");
+ getMockEndpoint("mock:result").expectedBodiesReceived("World");
+ getMockEndpoint("mock:result").expectedVariableReceived("bye", "Bye Camel");
+ getMockEndpoint("mock:result").message(0).header("echo").isNull();
+
+ template.sendBody("direct:sendAndReceive", "World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:send")
+ .setVariable("hello", simple("Camel"))
+ .to("mock:before")
+ .toV("direct:foo", "hello", null)
+ .to("mock:result");
+
+ from("direct:receive")
+ .toV("direct:foo", null, "bye")
+ .to("mock:after")
+ .setBody(variable("bye"))
+ .to("mock:result");
+
+ from("direct:sendAndReceive")
+ .setVariable("hello", simple("Camel"))
+ .to("mock:before")
+ .toV("direct:foo", "hello", "bye")
+ .to("mock:result");
+
+ from("direct:foo")
+ .setHeader("echo", simple("${body}${body}"))
+ .transform().simple("Bye ${body}");
+ }
+ };
+ }
+}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
index a3eb8c98162..85c34ecf86a 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
@@ -1117,7 +1117,8 @@ public final class ExchangeHelper {
}
Object body = exchange.getMessage().getBody();
// do a defensive copy of the headers
- Map<String, Object> map = exchange.getContext().getCamelContextExtension().getHeadersMapFactory().newMap(exchange.getMessage().getHeaders());
+ Map<String, Object> map = exchange.getContext().getCamelContextExtension().getHeadersMapFactory()
+ .newMap(exchange.getMessage().getHeaders());
if (repo != null) {
repo.setVariable(name, body);
repo.setVariable(name + ".headers", map);