You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2024/01/31 09:07:18 UTC

(camel) 02/16: CAMEL-19749: variables - Should also copy message headers into variable when using EIP variables

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch var-headers
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 0cba30cf1abdf61a06f8f85e0c730fd09a61ca5b
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Jan 29 10:12:58 2024 +0100

    CAMEL-19749: variables - Should also copy message headers into variable when using EIP variables
---
 .../camel/processor/SendDynamicProcessor.java      | 50 +++++++++++-----------
 .../org/apache/camel/processor/SendProcessor.java  | 18 ++++----
 .../org/apache/camel/support/ExchangeHelper.java   | 25 +++++++++++
 .../camel/support/processor/MarshalProcessor.java  |  1 -
 .../support/processor/UnmarshalProcessor.java      |  1 -
 5 files changed, 60 insertions(+), 35 deletions(-)

diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java
index 13620cc6ce6..19ab67b9421 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java
@@ -44,6 +44,8 @@ import org.apache.camel.util.URISupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
+
 /**
  * Processor for forwarding exchanges to a dynamic endpoint destination.
  *
@@ -101,8 +103,6 @@ public class SendDynamicProcessor extends AsyncProcessorSupport implements IdAwa
 
     @Override
     public boolean process(Exchange exchange, final AsyncCallback callback) {
-        // TODO: variables
-
         if (!isStarted()) {
             exchange.setException(new IllegalStateException("SendProcessor has not been started: " + this));
             callback.done(true);
@@ -179,9 +179,11 @@ public class SendDynamicProcessor extends AsyncProcessorSupport implements IdAwa
         // if we should store the received message body in a variable,
         // then we need to preserve the original message body
         Object body = null;
+        Map<String, Object> headers = null;
         if (variableReceive != null) {
             try {
                 body = exchange.getMessage().getBody();
+                headers = exchange.getMessage().getHeaders();
             } catch (Exception throwable) {
                 exchange.setException(throwable);
                 callback.done(true);
@@ -189,6 +191,7 @@ public class SendDynamicProcessor extends AsyncProcessorSupport implements IdAwa
             }
         }
         final Object originalBody = body;
+        final Map<String, Object> originalHeaders = headers;
 
         // send the exchange to the destination using the producer cache
         final Processor preProcessor = preAwareProcessor;
@@ -204,7 +207,6 @@ public class SendDynamicProcessor extends AsyncProcessorSupport implements IdAwa
                 }
                 // replace message body with variable
                 if (variableSend != null) {
-                    // it may be a global variable
                     Object value = ExchangeHelper.getVariable(exchange, variableSend);
                     exchange.getMessage().setBody(value);
                 }
@@ -217,30 +219,28 @@ public class SendDynamicProcessor extends AsyncProcessorSupport implements IdAwa
             }
 
             LOG.debug(">>>> {} {}", endpoint, e);
-            return p.process(target, new AsyncCallback() {
-                public void done(boolean doneSync) {
-                    // restore previous MEP
-                    target.setPattern(existingPattern);
-                    try {
-                        if (postProcessor != null) {
-                            postProcessor.process(target);
-                        }
-                    } catch (Exception e) {
-                        target.setException(e);
-                    }
-                    // stop endpoint if prototype as it was only used once
-                    if (stopEndpoint) {
-                        ServiceHelper.stopAndShutdownService(endpoint);
-                    }
-                    // result should be stored in variable instead of message body
-                    if (variableReceive != null) {
-                        Object value = exchange.getMessage().getBody();
-                        ExchangeHelper.setVariable(exchange, variableReceive, value);
-                        exchange.getMessage().setBody(originalBody);
+            return p.process(target, doneSync -> {
+                // restore previous MEP
+                target.setPattern(existingPattern);
+                try {
+                    if (postProcessor != null) {
+                        postProcessor.process(target);
                     }
-                    // signal we are done
-                    c.done(doneSync);
+                } catch (Exception e1) {
+                    target.setException(e1);
+                }
+                // stop endpoint if prototype as it was only used once
+                if (stopEndpoint) {
+                    ServiceHelper.stopAndShutdownService(endpoint);
+                }
+                // result should be stored in variable instead of message body
+                if (variableReceive != null) {
+                    ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive);
+                    exchange.getMessage().setBody(originalBody);
+                    exchange.getMessage().setHeaders(originalHeaders);
                 }
+                // signal we are done
+                c.done(doneSync);
             });
         });
     }
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java
index 960450d8d44..907482a70e0 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.processor;
 
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.camel.AsyncCallback;
@@ -131,9 +132,11 @@ public class SendProcessor extends AsyncProcessorSupport implements Traceable, E
         // if we should store the received message body in a variable,
         // then we need to preserve the original message body
         Object body = null;
+        Map<String, Object> headers = null;
         if (variableReceive != null) {
             try {
                 body = exchange.getMessage().getBody();
+                headers = exchange.getMessage().getHeaders();
             } catch (Exception throwable) {
                 exchange.setException(throwable);
                 callback.done(true);
@@ -141,6 +144,7 @@ public class SendProcessor extends AsyncProcessorSupport implements Traceable, E
             }
         }
         final Object originalBody = body;
+        final Map<String, Object> originalHeaders = headers;
 
         if (extendedStatistics) {
             counter.incrementAndGet();
@@ -172,11 +176,11 @@ public class SendProcessor extends AsyncProcessorSupport implements Traceable, E
             if (newCallback) {
                 ac = doneSync -> {
                     try {
-                        // result should be stored in variable instead of message body
+                        // result should be stored in variable instead of message body/headers
                         if (variableReceive != null) {
-                            Object value = exchange.getMessage().getBody();
-                            ExchangeHelper.setVariable(exchange, variableReceive, value);
+                            ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive);
                             exchange.getMessage().setBody(originalBody);
+                            exchange.getMessage().setHeaders(originalHeaders);
                         }
                         // restore previous MEP
                         target.setPattern(existingPattern);
@@ -193,7 +197,6 @@ public class SendProcessor extends AsyncProcessorSupport implements Traceable, E
             try {
                 // replace message body with variable
                 if (variableSend != null) {
-                    // it may be a global variable
                     Object value = ExchangeHelper.getVariable(exchange, variableSend);
                     exchange.getMessage().setBody(value);
                 }
@@ -220,7 +223,6 @@ public class SendProcessor extends AsyncProcessorSupport implements Traceable, E
 
             // replace message body with variable
             if (variableSend != null) {
-                // it may be a global variable
                 Object value = ExchangeHelper.getVariable(exchange, variableSend);
                 exchange.getMessage().setBody(value);
             }
@@ -232,11 +234,11 @@ public class SendProcessor extends AsyncProcessorSupport implements Traceable, E
                     (producer, ex, cb) -> producer.process(ex, doneSync -> {
                         // restore previous MEP
                         exchange.setPattern(existingPattern);
-                        // result should be stored in variable instead of message body
+                        // result should be stored in variable instead of message body/headers
                         if (variableReceive != null) {
-                            Object value = exchange.getMessage().getBody();
-                            ExchangeHelper.setVariable(exchange, variableReceive, value);
+                            ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive);
                             exchange.getMessage().setBody(originalBody);
+                            exchange.getMessage().setHeaders(originalHeaders);
                         }
                         // signal we are done
                         cb.done(doneSync);
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
index d55f4e1ce57..a3eb8c98162 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
@@ -1102,6 +1102,31 @@ public final class ExchangeHelper {
         }
     }
 
+    public static void setVariableFromMessageBodyAndHeaders(Exchange exchange, String name) {
+        VariableRepository repo = null;
+        String id = StringHelper.before(name, ":");
+        if (id != null) {
+            VariableRepositoryFactory factory
+                    = exchange.getContext().getCamelContextExtension().getContextPlugin(VariableRepositoryFactory.class);
+            repo = factory.getVariableRepository(id);
+            if (repo == null) {
+                exchange.setException(
+                        new IllegalArgumentException("VariableRepository with id: " + id + " does not exist"));
+            }
+            name = StringHelper.after(name, ":");
+        }
+        Object body = exchange.getMessage().getBody();
+        // do a defensive copy of the headers
+        Map<String, Object> map = exchange.getContext().getCamelContextExtension().getHeadersMapFactory().newMap(exchange.getMessage().getHeaders());
+        if (repo != null) {
+            repo.setVariable(name, body);
+            repo.setVariable(name + ".headers", map);
+        } else {
+            exchange.setVariable(name, body);
+            exchange.setVariable(name + ".headers", map);
+        }
+    }
+
     /**
      * Gets the variable
      *
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/processor/MarshalProcessor.java b/core/camel-support/src/main/java/org/apache/camel/support/processor/MarshalProcessor.java
index f1a299106ef..107180f753c 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/processor/MarshalProcessor.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/processor/MarshalProcessor.java
@@ -59,7 +59,6 @@ public class MarshalProcessor extends AsyncProcessorSupport implements Traceable
         final Object originalBody = in.getBody();
         Object body = originalBody;
         if (variableSend != null) {
-            // it may be a global variable
             body = ExchangeHelper.getVariable(exchange, variableSend);
         }
 
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/processor/UnmarshalProcessor.java b/core/camel-support/src/main/java/org/apache/camel/support/processor/UnmarshalProcessor.java
index dbfb9e7206c..8b93cabd473 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/processor/UnmarshalProcessor.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/processor/UnmarshalProcessor.java
@@ -68,7 +68,6 @@ public class UnmarshalProcessor extends AsyncProcessorSupport implements Traceab
             final Object originalBody = in.getBody();
             Object body = originalBody;
             if (variableSend != null) {
-                // it may be a global variable
                 body = ExchangeHelper.getVariable(exchange, variableSend);
             }
             final Message out;