You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2024/01/31 09:07:22 UTC
(camel) 06/16: CAMEL-19749: variables - Should also copy message headers into variable when using EIP variables
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch var-headers
in repository https://gitbox.apache.org/repos/asf/camel.git
commit a7dca4d3ba2ebf1109f28e89e181fe2b4d998d9e
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Jan 29 13:10:03 2024 +0100
CAMEL-19749: variables - Should also copy message headers into variable when using EIP variables
---
.../src/main/java/org/apache/camel/Exchange.java | 2 +-
.../VariableRepository.java => VariableAware.java} | 25 ++---------
.../org/apache/camel/spi/VariableRepository.java | 11 +----
.../java/org/apache/camel/processor/Enricher.java | 3 +-
.../org/apache/camel/processor/PollEnricher.java | 2 +-
.../camel/processor/SendDynamicProcessor.java | 2 +-
.../org/apache/camel/processor/SendProcessor.java | 13 +++---
.../processor/PollEnrichVariableHeadersTest.java | 10 +----
.../org/apache/camel/support/ExchangeHelper.java | 48 ++++++++++++----------
docs/user-manual/modules/ROOT/pages/variables.adoc | 4 +-
10 files changed, 49 insertions(+), 71 deletions(-)
diff --git a/core/camel-api/src/main/java/org/apache/camel/Exchange.java b/core/camel-api/src/main/java/org/apache/camel/Exchange.java
index 3aa5175e076..3495a865391 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Exchange.java
+++ b/core/camel-api/src/main/java/org/apache/camel/Exchange.java
@@ -64,7 +64,7 @@ import org.apache.camel.spi.annotations.ConstantProvider;
* details.
*/
@ConstantProvider("org.apache.camel.ExchangeConstantProvider")
-public interface Exchange {
+public interface Exchange extends VariableAware {
String AUTHENTICATION = "CamelAuthentication";
String AUTHENTICATION_FAILURE_POLICY_ID = "CamelAuthenticationFailurePolicyId";
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/VariableRepository.java b/core/camel-api/src/main/java/org/apache/camel/VariableAware.java
similarity index 61%
copy from core/camel-api/src/main/java/org/apache/camel/spi/VariableRepository.java
copy to core/camel-api/src/main/java/org/apache/camel/VariableAware.java
index 153b3502135..ab82303117c 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/VariableRepository.java
+++ b/core/camel-api/src/main/java/org/apache/camel/VariableAware.java
@@ -14,27 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.spi;
-
-import org.apache.camel.StaticService;
+package org.apache.camel;
/**
- * Repository for storing and accessing variables.
+ * An interface to represent an object that supports variables.
*/
-public interface VariableRepository extends StaticService {
-
- /**
- * The id of this repository.
- */
- String getId();
+public interface VariableAware {
/**
* Returns a variable by name.
*
- * If the variable is of type {@link org.apache.camel.StreamCache} then the repository should ensure to reset the
- * stream cache before returning the value, to ensure the content can be read by the Camel end user and would be
- * re-readable next time.
- *
* @param name the name of the variable
* @return the value of the given variable or <tt>null</tt> if there is no variable for the given name
*/
@@ -48,12 +37,4 @@ public interface VariableRepository extends StaticService {
*/
void setVariable(String name, Object value);
- /**
- * Removes the given variable
- *
- * @param name of the variable
- * @return the old value of the variable, or <tt>null</tt> if there was no variable for the given name
- */
- Object removeVariable(String name);
-
}
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/VariableRepository.java b/core/camel-api/src/main/java/org/apache/camel/spi/VariableRepository.java
index 153b3502135..fba280fcda1 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/VariableRepository.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/VariableRepository.java
@@ -17,11 +17,12 @@
package org.apache.camel.spi;
import org.apache.camel.StaticService;
+import org.apache.camel.VariableAware;
/**
* Repository for storing and accessing variables.
*/
-public interface VariableRepository extends StaticService {
+public interface VariableRepository extends StaticService, VariableAware {
/**
* The id of this repository.
@@ -40,14 +41,6 @@ public interface VariableRepository extends StaticService {
*/
Object getVariable(String name);
- /**
- * Sets a variable
- *
- * @param name of the variable
- * @param value the value of the variable
- */
- void setVariable(String name, Object value);
-
/**
* Removes the given variable
*
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java
index 9d641b01a05..5dcabc96fac 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java
@@ -224,7 +224,8 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA
if (aggregatedExchange != null) {
if (variableReceive != null) {
// result should be stored in variable instead of message body
- ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive);
+ ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive,
+ exchange.getMessage());
exchange.getMessage().setBody(originalBody);
exchange.getMessage().setHeaders(originalHeaders);
}
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java
index 4005d83834c..d939963dd26 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java
@@ -352,7 +352,7 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout
if (aggregatedExchange != null) {
if (variableReceive != null) {
// result should be stored in variable instead of message body
- ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive);
+ ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive, exchange.getMessage());
exchange.getMessage().setBody(originalBody);
exchange.getMessage().setHeaders(originalHeaders);
}
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java
index 76633de094d..d877f180684 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java
@@ -238,7 +238,7 @@ public class SendDynamicProcessor extends AsyncProcessorSupport implements IdAwa
}
// result should be stored in variable instead of message body
if (variableReceive != null) {
- ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive);
+ ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive, exchange.getMessage());
exchange.getMessage().setBody(originalBody);
exchange.getMessage().setHeaders(originalHeaders);
}
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java
index d8acea55531..b04e75a88fb 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java
@@ -131,11 +131,10 @@ public class SendProcessor extends AsyncProcessorSupport implements Traceable, E
// if you want to permanently to change the MEP then use .setExchangePattern in the DSL
final ExchangePattern existingPattern = exchange.getPattern();
- // if we should store the received message body in a variable,
- // then we need to preserve the original message body
+ // when using variables then we need to remember original data
Object body = null;
Map<String, Object> headers = null;
- if (variableReceive != null) {
+ if (variableSend != null || variableReceive != null) {
try {
body = exchange.getMessage().getBody();
// do a defensive copy of the headers
@@ -181,7 +180,8 @@ public class SendProcessor extends AsyncProcessorSupport implements Traceable, E
try {
// result should be stored in variable instead of message body/headers
if (variableReceive != null) {
- ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive);
+ ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive,
+ exchange.getMessage());
exchange.getMessage().setBody(originalBody);
exchange.getMessage().setHeaders(originalHeaders);
}
@@ -202,6 +202,8 @@ public class SendProcessor extends AsyncProcessorSupport implements Traceable, E
if (variableSend != null) {
Object value = ExchangeHelper.getVariable(exchange, variableSend);
exchange.getMessage().setBody(value);
+ // TODO: empty headers or
+
}
LOG.debug(">>>> {} {}", destination, exchange);
@@ -239,7 +241,8 @@ public class SendProcessor extends AsyncProcessorSupport implements Traceable, E
exchange.setPattern(existingPattern);
// result should be stored in variable instead of message body/headers
if (variableReceive != null) {
- ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive);
+ ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive,
+ exchange.getMessage());
exchange.getMessage().setBody(originalBody);
exchange.getMessage().setHeaders(originalHeaders);
}
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/PollEnrichVariableHeadersTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/PollEnrichVariableHeadersTest.java
index 852e3b0cd13..e754ec464ac 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/PollEnrichVariableHeadersTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/PollEnrichVariableHeadersTest.java
@@ -16,11 +16,8 @@
*/
package org.apache.camel.processor;
-import java.util.Map;
-
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
public class PollEnrichVariableHeadersTest extends ContextTestSupport {
@@ -33,13 +30,8 @@ public class PollEnrichVariableHeadersTest extends ContextTestSupport {
getMockEndpoint("mock:after").expectedVariableReceived("bye", "Bye World");
getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
getMockEndpoint("mock:result").expectedVariableReceived("bye", "Bye World");
+ getMockEndpoint("mock:result").expectedVariableReceived("bye.header.echo", "CamelCamel");
getMockEndpoint("mock:result").message(0).header("echo").isNull();
- getMockEndpoint("mock:result").whenAnyExchangeReceived(e -> {
- Map m = e.getVariable("bye.headers", Map.class);
- Assertions.assertNotNull(m);
- Assertions.assertEquals(1, m.size());
- Assertions.assertEquals("CamelCamel", m.get("echo"));
- });
template.sendBody("direct:receive", "World");
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
index 02df6ccce7b..6ba9fc6bb02 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
@@ -47,6 +47,7 @@ import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.Route;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.TypeConversionException;
+import org.apache.camel.VariableAware;
import org.apache.camel.WrappedFile;
import org.apache.camel.spi.NormalizedEndpointUri;
import org.apache.camel.spi.UnitOfWork;
@@ -1085,23 +1086,31 @@ public final class ExchangeHelper {
* @param value the value of the variable
*/
public static void setVariable(Exchange exchange, String name, Object value) {
+ VariableRepository repo = null;
String id = StringHelper.before(name, ":");
if (id != null) {
VariableRepositoryFactory factory
= exchange.getContext().getCamelContextExtension().getContextPlugin(VariableRepositoryFactory.class);
- VariableRepository repo = factory.getVariableRepository(id);
+ repo = factory.getVariableRepository(id);
if (repo != null) {
name = StringHelper.after(name, ":");
- repo.setVariable(name, value);
} else {
throw new IllegalArgumentException("VariableRepository with id: " + id + " does not exist");
}
- } else {
- exchange.setVariable(name, value);
}
+ VariableAware va = repo != null ? repo : exchange;
+ va.setVariable(name, value);
}
- public static void setVariableFromMessageBodyAndHeaders(Exchange exchange, String name) {
+ /**
+ * Sets the variable from the given message body and headers
+ *
+ * @param exchange the exchange
+ * @param name the variable name. Can be prefixed with repo-id:name to lookup the variable from a specific
+ * repository. If no repo-id is provided, then the variable is set on the exchange
+ * @param message the message with the body and headers as source values
+ */
+ public static void setVariableFromMessageBodyAndHeaders(Exchange exchange, String name, Message message) {
VariableRepository repo = null;
String id = StringHelper.before(name, ":");
if (id != null) {
@@ -1113,16 +1122,15 @@ public final class ExchangeHelper {
}
name = StringHelper.after(name, ":");
}
- Object body = exchange.getMessage().getBody();
- // do a defensive copy of the headers
- Map<String, Object> map = exchange.getContext().getCamelContextExtension().getHeadersMapFactory()
- .newMap(exchange.getMessage().getHeaders());
- if (repo != null) {
- repo.setVariable(name, body);
- repo.setVariable(name + ".headers", map);
- } else {
- exchange.setVariable(name, body);
- exchange.setVariable(name + ".headers", map);
+ VariableAware va = repo != null ? repo : exchange;
+
+ // set body and headers as variables
+ Object body = message.getBody();
+ va.setVariable(name, body);
+ for (Map.Entry<String, Object> header : message.getHeaders().entrySet()) {
+ String key = name + ".header." + header.getKey();
+ Object value = header.getValue();
+ va.setVariable(key, value);
}
}
@@ -1135,22 +1143,20 @@ public final class ExchangeHelper {
* @return the variable
*/
public static Object getVariable(Exchange exchange, String name) {
- Object answer;
+ VariableRepository repo = null;
String id = StringHelper.before(name, ":");
if (id != null) {
VariableRepositoryFactory factory
= exchange.getContext().getCamelContextExtension().getContextPlugin(VariableRepositoryFactory.class);
- VariableRepository repo = factory.getVariableRepository(id);
+ repo = factory.getVariableRepository(id);
if (repo != null) {
name = StringHelper.after(name, ":");
- answer = repo.getVariable(name);
} else {
throw new IllegalArgumentException("VariableRepository with id: " + id + " does not exist");
}
- } else {
- answer = exchange.getVariable(name);
}
- return answer;
+ VariableAware va = repo != null ? repo : exchange;
+ return va.getVariable(name);
}
/**
diff --git a/docs/user-manual/modules/ROOT/pages/variables.adoc b/docs/user-manual/modules/ROOT/pages/variables.adoc
index f52dbcc9687..c277a9b37ec 100644
--- a/docs/user-manual/modules/ROOT/pages/variables.adoc
+++ b/docs/user-manual/modules/ROOT/pages/variables.adoc
@@ -314,9 +314,11 @@ And the variable contains all the data received from the remote HTTP service sep
[source,properties]
----
myVar=Bye World
-myVar.headers.level=gold
+myVar.header.level=gold
----
+IMPORTANT: Notice the headers are stored with the syntax `variable.header.key`. In the example above the variable name is `myVar`,
+and the header key is `level`, which gives: `myVar.header.level`.
=== Using variable to store a copy of the incoming message body