You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2024/01/31 09:07:18 UTC
(camel) 02/16: CAMEL-19749: variables - Should also copy message headers into variable when using EIP variables
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch var-headers
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 0cba30cf1abdf61a06f8f85e0c730fd09a61ca5b
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Jan 29 10:12:58 2024 +0100
CAMEL-19749: variables - Should also copy message headers into variable when using EIP variables
---
.../camel/processor/SendDynamicProcessor.java | 50 +++++++++++-----------
.../org/apache/camel/processor/SendProcessor.java | 18 ++++----
.../org/apache/camel/support/ExchangeHelper.java | 25 +++++++++++
.../camel/support/processor/MarshalProcessor.java | 1 -
.../support/processor/UnmarshalProcessor.java | 1 -
5 files changed, 60 insertions(+), 35 deletions(-)
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java
index 13620cc6ce6..19ab67b9421 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java
@@ -44,6 +44,8 @@ import org.apache.camel.util.URISupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Map;
+
/**
* Processor for forwarding exchanges to a dynamic endpoint destination.
*
@@ -101,8 +103,6 @@ public class SendDynamicProcessor extends AsyncProcessorSupport implements IdAwa
@Override
public boolean process(Exchange exchange, final AsyncCallback callback) {
- // TODO: variables
-
if (!isStarted()) {
exchange.setException(new IllegalStateException("SendProcessor has not been started: " + this));
callback.done(true);
@@ -179,9 +179,11 @@ public class SendDynamicProcessor extends AsyncProcessorSupport implements IdAwa
// if we should store the received message body in a variable,
// then we need to preserve the original message body
Object body = null;
+ Map<String, Object> headers = null;
if (variableReceive != null) {
try {
body = exchange.getMessage().getBody();
+ headers = exchange.getMessage().getHeaders();
} catch (Exception throwable) {
exchange.setException(throwable);
callback.done(true);
@@ -189,6 +191,7 @@ public class SendDynamicProcessor extends AsyncProcessorSupport implements IdAwa
}
}
final Object originalBody = body;
+ final Map<String, Object> originalHeaders = headers;
// send the exchange to the destination using the producer cache
final Processor preProcessor = preAwareProcessor;
@@ -204,7 +207,6 @@ public class SendDynamicProcessor extends AsyncProcessorSupport implements IdAwa
}
// replace message body with variable
if (variableSend != null) {
- // it may be a global variable
Object value = ExchangeHelper.getVariable(exchange, variableSend);
exchange.getMessage().setBody(value);
}
@@ -217,30 +219,28 @@ public class SendDynamicProcessor extends AsyncProcessorSupport implements IdAwa
}
LOG.debug(">>>> {} {}", endpoint, e);
- return p.process(target, new AsyncCallback() {
- public void done(boolean doneSync) {
- // restore previous MEP
- target.setPattern(existingPattern);
- try {
- if (postProcessor != null) {
- postProcessor.process(target);
- }
- } catch (Exception e) {
- target.setException(e);
- }
- // stop endpoint if prototype as it was only used once
- if (stopEndpoint) {
- ServiceHelper.stopAndShutdownService(endpoint);
- }
- // result should be stored in variable instead of message body
- if (variableReceive != null) {
- Object value = exchange.getMessage().getBody();
- ExchangeHelper.setVariable(exchange, variableReceive, value);
- exchange.getMessage().setBody(originalBody);
+ return p.process(target, doneSync -> {
+ // restore previous MEP
+ target.setPattern(existingPattern);
+ try {
+ if (postProcessor != null) {
+ postProcessor.process(target);
}
- // signal we are done
- c.done(doneSync);
+ } catch (Exception e1) {
+ target.setException(e1);
+ }
+ // stop endpoint if prototype as it was only used once
+ if (stopEndpoint) {
+ ServiceHelper.stopAndShutdownService(endpoint);
+ }
+ // result should be stored in variable instead of message body
+ if (variableReceive != null) {
+ ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive);
+ exchange.getMessage().setBody(originalBody);
+ exchange.getMessage().setHeaders(originalHeaders);
}
+ // signal we are done
+ c.done(doneSync);
});
});
}
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java
index 960450d8d44..907482a70e0 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.processor;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.camel.AsyncCallback;
@@ -131,9 +132,11 @@ public class SendProcessor extends AsyncProcessorSupport implements Traceable, E
// if we should store the received message body in a variable,
// then we need to preserve the original message body
Object body = null;
+ Map<String, Object> headers = null;
if (variableReceive != null) {
try {
body = exchange.getMessage().getBody();
+ headers = exchange.getMessage().getHeaders();
} catch (Exception throwable) {
exchange.setException(throwable);
callback.done(true);
@@ -141,6 +144,7 @@ public class SendProcessor extends AsyncProcessorSupport implements Traceable, E
}
}
final Object originalBody = body;
+ final Map<String, Object> originalHeaders = headers;
if (extendedStatistics) {
counter.incrementAndGet();
@@ -172,11 +176,11 @@ public class SendProcessor extends AsyncProcessorSupport implements Traceable, E
if (newCallback) {
ac = doneSync -> {
try {
- // result should be stored in variable instead of message body
+ // result should be stored in variable instead of message body/headers
if (variableReceive != null) {
- Object value = exchange.getMessage().getBody();
- ExchangeHelper.setVariable(exchange, variableReceive, value);
+ ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive);
exchange.getMessage().setBody(originalBody);
+ exchange.getMessage().setHeaders(originalHeaders);
}
// restore previous MEP
target.setPattern(existingPattern);
@@ -193,7 +197,6 @@ public class SendProcessor extends AsyncProcessorSupport implements Traceable, E
try {
// replace message body with variable
if (variableSend != null) {
- // it may be a global variable
Object value = ExchangeHelper.getVariable(exchange, variableSend);
exchange.getMessage().setBody(value);
}
@@ -220,7 +223,6 @@ public class SendProcessor extends AsyncProcessorSupport implements Traceable, E
// replace message body with variable
if (variableSend != null) {
- // it may be a global variable
Object value = ExchangeHelper.getVariable(exchange, variableSend);
exchange.getMessage().setBody(value);
}
@@ -232,11 +234,11 @@ public class SendProcessor extends AsyncProcessorSupport implements Traceable, E
(producer, ex, cb) -> producer.process(ex, doneSync -> {
// restore previous MEP
exchange.setPattern(existingPattern);
- // result should be stored in variable instead of message body
+ // result should be stored in variable instead of message body/headers
if (variableReceive != null) {
- Object value = exchange.getMessage().getBody();
- ExchangeHelper.setVariable(exchange, variableReceive, value);
+ ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive);
exchange.getMessage().setBody(originalBody);
+ exchange.getMessage().setHeaders(originalHeaders);
}
// signal we are done
cb.done(doneSync);
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
index d55f4e1ce57..a3eb8c98162 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
@@ -1102,6 +1102,31 @@ public final class ExchangeHelper {
}
}
+ public static void setVariableFromMessageBodyAndHeaders(Exchange exchange, String name) {
+ VariableRepository repo = null;
+ String id = StringHelper.before(name, ":");
+ if (id != null) {
+ VariableRepositoryFactory factory
+ = exchange.getContext().getCamelContextExtension().getContextPlugin(VariableRepositoryFactory.class);
+ repo = factory.getVariableRepository(id);
+ if (repo == null) {
+ exchange.setException(
+ new IllegalArgumentException("VariableRepository with id: " + id + " does not exist"));
+ }
+ name = StringHelper.after(name, ":");
+ }
+ Object body = exchange.getMessage().getBody();
+ // do a defensive copy of the headers
+ Map<String, Object> map = exchange.getContext().getCamelContextExtension().getHeadersMapFactory().newMap(exchange.getMessage().getHeaders());
+ if (repo != null) {
+ repo.setVariable(name, body);
+ repo.setVariable(name + ".headers", map);
+ } else {
+ exchange.setVariable(name, body);
+ exchange.setVariable(name + ".headers", map);
+ }
+ }
+
/**
* Gets the variable
*
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/processor/MarshalProcessor.java b/core/camel-support/src/main/java/org/apache/camel/support/processor/MarshalProcessor.java
index f1a299106ef..107180f753c 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/processor/MarshalProcessor.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/processor/MarshalProcessor.java
@@ -59,7 +59,6 @@ public class MarshalProcessor extends AsyncProcessorSupport implements Traceable
final Object originalBody = in.getBody();
Object body = originalBody;
if (variableSend != null) {
- // it may be a global variable
body = ExchangeHelper.getVariable(exchange, variableSend);
}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/processor/UnmarshalProcessor.java b/core/camel-support/src/main/java/org/apache/camel/support/processor/UnmarshalProcessor.java
index dbfb9e7206c..8b93cabd473 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/processor/UnmarshalProcessor.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/processor/UnmarshalProcessor.java
@@ -68,7 +68,6 @@ public class UnmarshalProcessor extends AsyncProcessorSupport implements Traceab
final Object originalBody = in.getBody();
Object body = originalBody;
if (variableSend != null) {
- // it may be a global variable
body = ExchangeHelper.getVariable(exchange, variableSend);
}
final Message out;