You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2024/01/23 15:30:05 UTC

(camel) 15/19: CAMEL-19749: EIPs should make it easy to use together with variables.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch var
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 7320558aeeac4c0c45fd993e15eae2c0c83d0a32
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Jan 23 13:28:19 2024 +0100

    CAMEL-19749: EIPs should make it easy to use together with variables.
---
 .../resources/org/apache/camel/model/marshal.json  |   4 +-
 .../org/apache/camel/model/unmarshal.json          |   4 +-
 .../org/apache/camel/builder/DataFormatClause.java |  43 +++++++-
 .../org/apache/camel/model/MarshalDefinition.java  |  48 +++++++++
 .../apache/camel/model/UnmarshalDefinition.java    |  44 +++++++++
 .../org/apache/camel/reifier/MarshalReifier.java   |   5 +-
 .../org/apache/camel/reifier/UnmarshalReifier.java |   6 +-
 .../camel/processor/MarshalVariableTest.java       | 109 +++++++++++++++++++++
 .../camel/processor/UnmarshalVariableTest.java     | 107 ++++++++++++++++++++
 .../api/management/mbean/ManagedMarshalMBean.java  |   6 ++
 .../management/mbean/ManagedUnmarshalMBean.java    |   9 ++
 .../camel/management/mbean/ManagedMarshal.java     |  10 ++
 .../camel/management/mbean/ManagedUnmarshal.java   |  15 +++
 .../camel/support/processor/MarshalProcessor.java  |  34 ++++++-
 .../support/processor/UnmarshalProcessor.java      |  53 ++++++++--
 .../java/org/apache/camel/xml/in/ModelParser.java  |  20 ++--
 .../java/org/apache/camel/xml/out/ModelWriter.java |   4 +
 .../org/apache/camel/yaml/out/ModelWriter.java     |   4 +
 18 files changed, 503 insertions(+), 22 deletions(-)

diff --git a/core/camel-core-model/src/generated/resources/org/apache/camel/model/marshal.json b/core/camel-core-model/src/generated/resources/org/apache/camel/model/marshal.json
index 178597ebd63..3f33b46e27e 100644
--- a/core/camel-core-model/src/generated/resources/org/apache/camel/model/marshal.json
+++ b/core/camel-core-model/src/generated/resources/org/apache/camel/model/marshal.json
@@ -15,6 +15,8 @@
     "id": { "index": 0, "kind": "attribute", "displayName": "Id", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the id of this node" },
     "description": { "index": 1, "kind": "element", "displayName": "Description", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the description of this node" },
     "disabled": { "index": 2, "kind": "attribute", "displayName": "Disabled", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether to disable this EIP from the route during build time. Once an EIP has been disabled then it cannot be enabled later at runtime." },
-    "dataFormatType": { "index": 3, "kind": "element", "displayName": "Data Format Type", "required": true, "type": "object", "javaType": "org.apache.camel.model.DataFormatDefinition", "oneOf": [ "asn1", "avro", "barcode", "base64", "bindy", "cbor", "crypto", "csv", "custom", "fhirJson", "fhirXml", "flatpack", "grok", "gzipDeflater", "hl7", "ical", "jacksonXml", "jaxb", "json", "jsonApi", "lzf", "mimeMultipart", "parquetAvro", "pgp", "protobuf", "rss", "soap", "swiftMt", "swiftMx", "sysl [...]
+    "dataFormatType": { "index": 3, "kind": "element", "displayName": "Data Format Type", "required": true, "type": "object", "javaType": "org.apache.camel.model.DataFormatDefinition", "oneOf": [ "asn1", "avro", "barcode", "base64", "bindy", "cbor", "crypto", "csv", "custom", "fhirJson", "fhirXml", "flatpack", "grok", "gzipDeflater", "hl7", "ical", "jacksonXml", "jaxb", "json", "jsonApi", "lzf", "mimeMultipart", "parquetAvro", "pgp", "protobuf", "rss", "soap", "swiftMt", "swiftMx", "sysl [...]
+    "variableSend": { "index": 4, "kind": "attribute", "displayName": "Variable Send", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "To use a variable to store the received message body (only body, not headers). This is handy for easy access to the received message body via variables. Important: When using receive variable then the received body is stored only in this variable and not on the  [...]
+    "variableReceive": { "index": 5, "kind": "attribute", "displayName": "Variable Receive", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "To use a variable to store the received message body (only body, not headers). This is handy for easy access to the received message body via variables. Important: When using receive variable then the received body is stored only in this variable and not o [...]
   }
 }
diff --git a/core/camel-core-model/src/generated/resources/org/apache/camel/model/unmarshal.json b/core/camel-core-model/src/generated/resources/org/apache/camel/model/unmarshal.json
index fe21bf9f735..202b7a8f74b 100644
--- a/core/camel-core-model/src/generated/resources/org/apache/camel/model/unmarshal.json
+++ b/core/camel-core-model/src/generated/resources/org/apache/camel/model/unmarshal.json
@@ -16,6 +16,8 @@
     "description": { "index": 1, "kind": "element", "displayName": "Description", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the description of this node" },
     "disabled": { "index": 2, "kind": "attribute", "displayName": "Disabled", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether to disable this EIP from the route during build time. Once an EIP has been disabled then it cannot be enabled later at runtime." },
     "dataFormatType": { "index": 3, "kind": "element", "displayName": "Data Format Type", "required": true, "type": "object", "javaType": "org.apache.camel.model.DataFormatDefinition", "oneOf": [ "asn1", "avro", "barcode", "base64", "bindy", "cbor", "crypto", "csv", "custom", "fhirJson", "fhirXml", "flatpack", "grok", "gzipDeflater", "hl7", "ical", "jacksonXml", "jaxb", "json", "jsonApi", "lzf", "mimeMultipart", "parquetAvro", "pgp", "protobuf", "rss", "soap", "swiftMt", "swiftMx", "sysl [...]
-    "allowNullBody": { "index": 4, "kind": "attribute", "displayName": "Allow Null Body", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Indicates whether null is allowed as value of a body to unmarshall." }
+    "variableSend": { "index": 4, "kind": "attribute", "displayName": "Variable Send", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "To use a variable to store the received message body (only body, not headers). This is handy for easy access to the received message body via variables. Important: When using receive variable then the received body is stored only in this variable and not on the  [...]
+    "variableReceive": { "index": 5, "kind": "attribute", "displayName": "Variable Receive", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "To use a variable to store the received message body (only body, not headers). This is handy for easy access to the received message body via variables. Important: When using receive variable then the received body is stored only in this variable and not o [...]
+    "allowNullBody": { "index": 6, "kind": "attribute", "displayName": "Allow Null Body", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Indicates whether null is allowed as value of a body to unmarshall." }
   }
 }
diff --git a/core/camel-core-model/src/main/java/org/apache/camel/builder/DataFormatClause.java b/core/camel-core-model/src/main/java/org/apache/camel/builder/DataFormatClause.java
index 59f2e1649e4..73c6d1eb97a 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/builder/DataFormatClause.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/builder/DataFormatClause.java
@@ -21,7 +21,9 @@ import java.util.Map;
 import org.w3c.dom.Node;
 
 import org.apache.camel.model.DataFormatDefinition;
+import org.apache.camel.model.MarshalDefinition;
 import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.model.UnmarshalDefinition;
 import org.apache.camel.model.dataformat.ASN1DataFormat;
 import org.apache.camel.model.dataformat.AvroDataFormat;
 import org.apache.camel.model.dataformat.AvroLibrary;
@@ -69,6 +71,8 @@ import org.apache.camel.support.jsse.KeyStoreParameters;
 public class DataFormatClause<T extends ProcessorDefinition<?>> {
     private final T processorType;
     private final Operation operation;
+    private String variableSend;
+    private String variableReceive;
     private boolean allowNullBody;
 
     /**
@@ -1349,13 +1353,48 @@ public class DataFormatClause<T extends ProcessorDefinition<?>> {
         return this;
     }
 
+    /**
+     * To use a variable to store the received message body (only body, not headers). This is handy for easy access to
+     * the received message body via variables.
+     *
+     * Important: When using receive variable then the received body is stored only in this variable and <b>not</b> on
+     * the current {@link org.apache.camel.Message}.
+     */
+    public DataFormatClause<T> variableSend(String variableSend) {
+        this.variableSend = variableSend;
+        return this;
+    }
+
+    /**
+     * To use a variable to store the received message body (only body, not headers). This is handy for easy access to
+     * the received message body via variables.
+     *
+     * Important: When using receive variable then the received body is stored only in this variable and <b>not</b> on
+     * the current {@link org.apache.camel.Message}.
+     */
+    public DataFormatClause<T> variableReceive(String variableReceive) {
+        this.variableReceive = variableReceive;
+        return this;
+    }
+
     @SuppressWarnings("unchecked")
     private T dataFormat(DataFormatDefinition dataFormatType) {
         switch (operation) {
             case Unmarshal:
-                return (T) processorType.unmarshal(dataFormatType, allowNullBody);
+                UnmarshalDefinition unmarshal = new UnmarshalDefinition(dataFormatType);
+                if (allowNullBody) {
+                    unmarshal.allowNullBody(true);
+                }
+                unmarshal.setVariableReceive(variableReceive);
+                unmarshal.setVariableSend(variableSend);
+                processorType.addOutput(unmarshal);
+                return processorType;
             case Marshal:
-                return (T) processorType.marshal(dataFormatType);
+                MarshalDefinition marshal = new MarshalDefinition(dataFormatType);
+                marshal.setVariableReceive(variableReceive);
+                marshal.setVariableSend(variableSend);
+                processorType.addOutput(marshal);
+                return processorType;
             default:
                 throw new IllegalArgumentException("Unknown DataFormat operation: " + operation);
         }
diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/MarshalDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/MarshalDefinition.java
index 0e87066c92d..75bc403dd83 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/model/MarshalDefinition.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/model/MarshalDefinition.java
@@ -18,6 +18,7 @@ package org.apache.camel.model;
 
 import jakarta.xml.bind.annotation.XmlAccessType;
 import jakarta.xml.bind.annotation.XmlAccessorType;
+import jakarta.xml.bind.annotation.XmlAttribute;
 import jakarta.xml.bind.annotation.XmlElement;
 import jakarta.xml.bind.annotation.XmlElements;
 import jakarta.xml.bind.annotation.XmlRootElement;
@@ -114,6 +115,10 @@ public class MarshalDefinition extends NoOutputDefinition<MarshalDefinition> imp
             @XmlElement(name = "zipDeflater", type = ZipDeflaterDataFormat.class),
             @XmlElement(name = "zipFile", type = ZipFileDataFormat.class) })
     private DataFormatDefinition dataFormatType;
+    @XmlAttribute
+    private String variableSend;
+    @XmlAttribute
+    private String variableReceive;
 
     public MarshalDefinition() {
     }
@@ -154,4 +159,47 @@ public class MarshalDefinition extends NoOutputDefinition<MarshalDefinition> imp
         this.dataFormatType = dataFormatType;
     }
 
+    public String getVariableSend() {
+        return variableSend;
+    }
+
+    public void setVariableSend(String variableSend) {
+        this.variableSend = variableSend;
+    }
+
+    public String getVariableReceive() {
+        return variableReceive;
+    }
+
+    public void setVariableReceive(String variableReceive) {
+        this.variableReceive = variableReceive;
+    }
+
+    // Fluent API
+    // -------------------------------------------------------------------------
+
+    /**
+     * To use a variable to store the received message body (only body, not headers). This is handy for easy access to
+     * the received message body via variables.
+     *
+     * Important: When using receive variable then the received body is stored only in this variable and <b>not</b> on
+     * the current {@link org.apache.camel.Message}.
+     */
+    public MarshalDefinition variableReceive(String variableReceive) {
+        setVariableReceive(variableReceive);
+        return this;
+    }
+
+    /**
+     * To use a variable to store the received message body (only body, not headers). This is handy for easy access to
+     * the received message body via variables.
+     *
+     * Important: When using receive variable then the received body is stored only in this variable and <b>not</b> on
+     * the current {@link org.apache.camel.Message}.
+     */
+    public MarshalDefinition variableSend(String variableSend) {
+        setVariableSend(variableSend);
+        return this;
+    }
+
 }
diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/UnmarshalDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/UnmarshalDefinition.java
index f948dc85819..cb603a8d004 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/model/UnmarshalDefinition.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/model/UnmarshalDefinition.java
@@ -116,6 +116,10 @@ public class UnmarshalDefinition extends NoOutputDefinition<UnmarshalDefinition>
             @XmlElement(name = "zipFile", type = ZipFileDataFormat.class) })
     private DataFormatDefinition dataFormatType;
     @XmlAttribute
+    private String variableSend;
+    @XmlAttribute
+    private String variableReceive;
+    @XmlAttribute
     @Metadata(label = "advanced", javaType = "java.lang.Boolean", defaultValue = "false")
     private String allowNullBody;
 
@@ -162,6 +166,22 @@ public class UnmarshalDefinition extends NoOutputDefinition<UnmarshalDefinition>
         this.dataFormatType = dataFormatType;
     }
 
+    public String getVariableSend() {
+        return variableSend;
+    }
+
+    public void setVariableSend(String variableSend) {
+        this.variableSend = variableSend;
+    }
+
+    public String getVariableReceive() {
+        return variableReceive;
+    }
+
+    public void setVariableReceive(String variableReceive) {
+        this.variableReceive = variableReceive;
+    }
+
     public String getAllowNullBody() {
         return allowNullBody;
     }
@@ -176,6 +196,30 @@ public class UnmarshalDefinition extends NoOutputDefinition<UnmarshalDefinition>
     // Fluent API
     // -------------------------------------------------------------------------
 
+    /**
+     * To use a variable to store the received message body (only body, not headers). This is handy for easy access to
+     * the received message body via variables.
+     *
+     * Important: When using receive variable then the received body is stored only in this variable and <b>not</b> on
+     * the current {@link org.apache.camel.Message}.
+     */
+    public UnmarshalDefinition variableReceive(String variableReceive) {
+        setVariableReceive(variableReceive);
+        return this;
+    }
+
+    /**
+     * To use a variable to store the received message body (only body, not headers). This is handy for easy access to
+     * the received message body via variables.
+     *
+     * Important: When using receive variable then the received body is stored only in this variable and <b>not</b> on
+     * the current {@link org.apache.camel.Message}.
+     */
+    public UnmarshalDefinition variableSend(String variableSend) {
+        setVariableSend(variableSend);
+        return this;
+    }
+
     /**
      * Indicates whether {@code null} is allowed as value of a body to unmarshall.
      *
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/MarshalReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/MarshalReifier.java
index dc79f718ce4..e39e96ea9cc 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/MarshalReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/MarshalReifier.java
@@ -33,7 +33,10 @@ public class MarshalReifier extends ProcessorReifier<MarshalDefinition> {
     @Override
     public Processor createProcessor() {
         DataFormat dataFormat = DataFormatReifier.getDataFormat(camelContext, definition.getDataFormatType());
-        return new MarshalProcessor(dataFormat);
+        MarshalProcessor answer = new MarshalProcessor(dataFormat);
+        answer.setVariableSend(parseString(definition.getVariableSend()));
+        answer.setVariableReceive(parseString(definition.getVariableReceive()));
+        return answer;
     }
 
 }
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/UnmarshalReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/UnmarshalReifier.java
index 025fed2a1c6..f1bd1fc79ef 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/UnmarshalReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/UnmarshalReifier.java
@@ -33,6 +33,10 @@ public class UnmarshalReifier extends ProcessorReifier<UnmarshalDefinition> {
     @Override
     public Processor createProcessor() {
         DataFormat dataFormat = DataFormatReifier.getDataFormat(camelContext, definition.getDataFormatType());
-        return new UnmarshalProcessor(dataFormat, Boolean.TRUE == parseBoolean(definition.getAllowNullBody()));
+        UnmarshalProcessor answer
+                = new UnmarshalProcessor(dataFormat, Boolean.TRUE == parseBoolean(definition.getAllowNullBody()));
+        answer.setVariableSend(parseString(definition.getVariableSend()));
+        answer.setVariableReceive(parseString(definition.getVariableReceive()));
+        return answer;
     }
 }
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/MarshalVariableTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/MarshalVariableTest.java
new file mode 100644
index 00000000000..55f140697d7
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/MarshalVariableTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spi.DataFormat;
+import org.apache.camel.support.service.ServiceSupport;
+import org.junit.jupiter.api.Test;
+
+public class MarshalVariableTest extends ContextTestSupport {
+
+    @Test
+    public void testSend() throws Exception {
+        getMockEndpoint("mock:before").expectedBodiesReceived("World");
+        getMockEndpoint("mock:before").expectedVariableReceived("hello", "Camel");
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");
+        getMockEndpoint("mock:result").expectedVariableReceived("hello", "Camel");
+
+        template.sendBody("direct:send", "World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testReceive() throws Exception {
+        getMockEndpoint("mock:after").expectedBodiesReceived("World");
+        getMockEndpoint("mock:after").expectedVariableReceived("bye", "Bye World");
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+        getMockEndpoint("mock:result").expectedVariableReceived("bye", "Bye World");
+
+        template.sendBody("direct:receive", "World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testSendAndReceive() throws Exception {
+        getMockEndpoint("mock:before").expectedBodiesReceived("World");
+        getMockEndpoint("mock:before").expectedVariableReceived("hello", "Camel");
+        getMockEndpoint("mock:result").expectedBodiesReceived("World");
+        getMockEndpoint("mock:result").expectedVariableReceived("bye", "Bye Camel");
+
+        template.sendBody("direct:sendAndReceive", "World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                context.getRegistry().bind("myDF", new MyByeDataFormat());
+
+                from("direct:send")
+                        .setVariable("hello", simple("Camel"))
+                        .to("mock:before")
+                        .marshal().variableSend("hello").custom("myDF")
+                        .to("mock:result");
+
+                from("direct:receive")
+                        .marshal().variableReceive("bye").custom("myDF")
+                        .to("mock:after")
+                        .setBody(simple("${variable:bye}"))
+                        .to("mock:result");
+
+                from("direct:sendAndReceive")
+                        .setVariable("hello", simple("Camel"))
+                        .to("mock:before")
+                        .marshal().variableSend("hello").variableReceive("bye").custom("myDF")
+                        .to("mock:result");
+            }
+        };
+    }
+
+    public static class MyByeDataFormat extends ServiceSupport implements DataFormat {
+
+        @Override
+        public void marshal(Exchange exchange, Object graph, OutputStream stream) throws Exception {
+            String line = "Bye " + graph.toString();
+            stream.write(line.getBytes());
+        }
+
+        @Override
+        public Object unmarshal(Exchange exchange, InputStream stream) throws Exception {
+            // noop
+            return null;
+        }
+    }
+}
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/UnmarshalVariableTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/UnmarshalVariableTest.java
new file mode 100644
index 00000000000..7eb9aeecd9a
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/UnmarshalVariableTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spi.DataFormat;
+import org.apache.camel.support.service.ServiceSupport;
+import org.junit.jupiter.api.Test;
+
+public class UnmarshalVariableTest extends ContextTestSupport {
+
+    @Test
+    public void testSend() throws Exception {
+        getMockEndpoint("mock:before").expectedBodiesReceived("World");
+        getMockEndpoint("mock:before").expectedVariableReceived("hello", "Camel");
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");
+        getMockEndpoint("mock:result").expectedVariableReceived("hello", "Camel");
+
+        template.sendBody("direct:send", "World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testReceive() throws Exception {
+        getMockEndpoint("mock:after").expectedBodiesReceived("World");
+        getMockEndpoint("mock:after").expectedVariableReceived("bye", "Bye World");
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+        getMockEndpoint("mock:result").expectedVariableReceived("bye", "Bye World");
+
+        template.sendBody("direct:receive", "World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testSendAndReceive() throws Exception {
+        getMockEndpoint("mock:before").expectedBodiesReceived("World");
+        getMockEndpoint("mock:before").expectedVariableReceived("hello", "Camel");
+        getMockEndpoint("mock:result").expectedBodiesReceived("World");
+        getMockEndpoint("mock:result").expectedVariableReceived("bye", "Bye Camel");
+
+        template.sendBody("direct:sendAndReceive", "World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                context.getRegistry().bind("myDF", new MyByeDataFormat());
+
+                from("direct:send")
+                        .setVariable("hello", simple("Camel"))
+                        .to("mock:before")
+                        .unmarshal().variableSend("hello").custom("myDF")
+                        .to("mock:result");
+
+                from("direct:receive")
+                        .unmarshal().variableReceive("bye").custom("myDF")
+                        .to("mock:after")
+                        .setBody(simple("${variable:bye}"))
+                        .to("mock:result");
+
+                from("direct:sendAndReceive")
+                        .setVariable("hello", simple("Camel"))
+                        .to("mock:before")
+                        .unmarshal().variableSend("hello").variableReceive("bye").custom("myDF")
+                        .to("mock:result");
+            }
+        };
+    }
+
+    public static class MyByeDataFormat extends ServiceSupport implements DataFormat {
+
+        @Override
+        public void marshal(Exchange exchange, Object graph, OutputStream stream) throws Exception {
+            // noop
+        }
+
+        @Override
+        public Object unmarshal(Exchange exchange, InputStream stream) throws Exception {
+            return "Bye " + exchange.getContext().getTypeConverter().convertTo(String.class, exchange, stream);
+        }
+    }
+}
diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedMarshalMBean.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedMarshalMBean.java
index dc6a8144cb5..9d8e8684581 100644
--- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedMarshalMBean.java
+++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedMarshalMBean.java
@@ -23,4 +23,10 @@ public interface ManagedMarshalMBean extends ManagedProcessorMBean {
     @ManagedAttribute(description = "The name of the DataFormat to use for marshal")
     String getDataFormatName();
 
+    @ManagedAttribute(description = "Variable as the source for the message body to send")
+    String getVariableSend();
+
+    @ManagedAttribute(description = "Variable to store the received message body (only body, not headers)")
+    String getVariableReceive();
+
 }
diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedUnmarshalMBean.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedUnmarshalMBean.java
index 550fcec168c..88ac0aa9991 100644
--- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedUnmarshalMBean.java
+++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedUnmarshalMBean.java
@@ -23,4 +23,13 @@ public interface ManagedUnmarshalMBean extends ManagedProcessorMBean {
     @ManagedAttribute(description = "The name of the DataFormat to use for unmarshal")
     String getDataFormatName();
 
+    @ManagedAttribute(description = "Variable as the source for the message body to send")
+    String getVariableSend();
+
+    @ManagedAttribute(description = "Variable to store the received message body (only body, not headers)")
+    String getVariableReceive();
+
+    @ManagedAttribute(description = "Whether null is allowed as value of a body to unmarshall")
+    boolean isAllowNullBody();
+
 }
diff --git a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedMarshal.java b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedMarshal.java
index 40745fc2674..1722cc94c80 100644
--- a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedMarshal.java
+++ b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedMarshal.java
@@ -44,4 +44,14 @@ public class ManagedMarshal extends ManagedProcessor implements ManagedMarshalMB
         }
         return name;
     }
+
+    @Override
+    public String getVariableSend() {
+        return processor.getVariableSend();
+    }
+
+    @Override
+    public String getVariableReceive() {
+        return processor.getVariableReceive();
+    }
 }
diff --git a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedUnmarshal.java b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedUnmarshal.java
index cdfd71ac1f6..90159743815 100644
--- a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedUnmarshal.java
+++ b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedUnmarshal.java
@@ -44,4 +44,19 @@ public class ManagedUnmarshal extends ManagedProcessor implements ManagedUnmarsh
         }
         return name;
     }
+
+    @Override
+    public String getVariableSend() {
+        return processor.getVariableSend();
+    }
+
+    @Override
+    public String getVariableReceive() {
+        return processor.getVariableReceive();
+    }
+
+    @Override
+    public boolean isAllowNullBody() {
+        return processor.isAllowNullBody();
+    }
 }
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/processor/MarshalProcessor.java b/core/camel-support/src/main/java/org/apache/camel/support/processor/MarshalProcessor.java
index 0c116c40e54..f1a299106ef 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/processor/MarshalProcessor.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/processor/MarshalProcessor.java
@@ -26,6 +26,7 @@ import org.apache.camel.spi.DataFormat;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.spi.RouteIdAware;
 import org.apache.camel.support.AsyncProcessorSupport;
+import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.builder.OutputStreamBuilder;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.ObjectHelper;
@@ -39,6 +40,8 @@ public class MarshalProcessor extends AsyncProcessorSupport implements Traceable
     private String routeId;
     private CamelContext camelContext;
     private final DataFormat dataFormat;
+    private String variableSend;
+    private String variableReceive;
 
     public MarshalProcessor(DataFormat dataFormat) {
         this.dataFormat = dataFormat;
@@ -53,7 +56,12 @@ public class MarshalProcessor extends AsyncProcessorSupport implements Traceable
         OutputStreamBuilder osb = OutputStreamBuilder.withExchange(exchange);
 
         Message in = exchange.getIn();
-        Object body = in.getBody();
+        final Object originalBody = in.getBody();
+        Object body = originalBody;
+        if (variableSend != null) {
+            // it may be a global variable
+            body = ExchangeHelper.getVariable(exchange, variableSend);
+        }
 
         // lets setup the out message before we invoke the dataFormat
         // so that it can mutate it if necessary
@@ -62,7 +70,13 @@ public class MarshalProcessor extends AsyncProcessorSupport implements Traceable
 
         try {
             dataFormat.marshal(exchange, body, osb);
-            out.setBody(osb.build());
+            Object result = osb.build();
+            // result should be stored in variable instead of message body
+            if (variableReceive != null) {
+                ExchangeHelper.setVariable(exchange, variableReceive, result);
+            } else {
+                out.setBody(result);
+            }
         } catch (Exception e) {
             // remove OUT message, as an exception occurred
             exchange.setOut(null);
@@ -113,6 +127,22 @@ public class MarshalProcessor extends AsyncProcessorSupport implements Traceable
         this.camelContext = camelContext;
     }
 
+    public String getVariableSend() {
+        return variableSend;
+    }
+
+    public void setVariableSend(String variableSend) {
+        this.variableSend = variableSend;
+    }
+
+    public String getVariableReceive() {
+        return variableReceive;
+    }
+
+    public void setVariableReceive(String variableReceive) {
+        this.variableReceive = variableReceive;
+    }
+
     @Override
     protected void doStart() throws Exception {
         // inject CamelContext on data format
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/processor/UnmarshalProcessor.java b/core/camel-support/src/main/java/org/apache/camel/support/processor/UnmarshalProcessor.java
index 25e2acba8ac..dbfb9e7206c 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/processor/UnmarshalProcessor.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/processor/UnmarshalProcessor.java
@@ -30,6 +30,7 @@ import org.apache.camel.spi.DataFormat;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.spi.RouteIdAware;
 import org.apache.camel.support.AsyncProcessorSupport;
+import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.ObjectHelper;
@@ -44,6 +45,8 @@ public class UnmarshalProcessor extends AsyncProcessorSupport implements Traceab
     private CamelContext camelContext;
     private final DataFormat dataFormat;
     private final boolean allowNullBody;
+    private String variableSend;
+    private String variableReceive;
 
     public UnmarshalProcessor(DataFormat dataFormat) {
         this(dataFormat, false);
@@ -62,17 +65,20 @@ public class UnmarshalProcessor extends AsyncProcessorSupport implements Traceab
         Object result = null;
         try {
             final Message in = exchange.getIn();
+            final Object originalBody = in.getBody();
+            Object body = originalBody;
+            if (variableSend != null) {
+                // it may be a global variable
+                body = ExchangeHelper.getVariable(exchange, variableSend);
+            }
             final Message out;
-            if (allowNullBody && in.getBody() == null) {
+            if (allowNullBody && body == null) {
                 // The body is null, and it is an allowed value so let's skip the unmarshalling
                 out = exchange.getOut();
             } else {
-                Object body = in.getBody();
-
                 // lets set up the out message before we invoke the dataFormat so that it can mutate it if necessary
                 out = exchange.getOut();
                 out.copyFrom(in);
-
                 result = dataFormat.unmarshal(exchange, body);
             }
             if (result instanceof Exchange) {
@@ -82,11 +88,22 @@ public class UnmarshalProcessor extends AsyncProcessorSupport implements Traceab
                             "The returned exchange " + result + " is not the same as " + exchange
                                                     + " provided to the DataFormat");
                 }
-            } else if (result instanceof Message) {
-                // the dataformat has probably set headers, attachments, etc. so let's use it as the outbound payload
-                exchange.setOut((Message) result);
+            } else if (result instanceof Message msg) {
+                // result should be stored in variable instead of message body
+                if (variableReceive != null) {
+                    Object value = msg.getBody();
+                    ExchangeHelper.setVariable(exchange, variableReceive, value);
+                } else {
+                    // the dataformat has probably set headers, attachments, etc. so let's use it as the outbound payload
+                    exchange.setOut(msg);
+                }
             } else {
-                out.setBody(result);
+                // result should be stored in variable instead of message body
+                if (variableReceive != null) {
+                    ExchangeHelper.setVariable(exchange, variableReceive, result);
+                } else {
+                    out.setBody(result);
+                }
             }
         } catch (Exception e) {
             // remove OUT message, as an exception occurred
@@ -142,6 +159,26 @@ public class UnmarshalProcessor extends AsyncProcessorSupport implements Traceab
         this.camelContext = camelContext;
     }
 
+    public boolean isAllowNullBody() {
+        return allowNullBody;
+    }
+
+    public String getVariableSend() {
+        return variableSend;
+    }
+
+    public void setVariableSend(String variableSend) {
+        this.variableSend = variableSend;
+    }
+
+    public String getVariableReceive() {
+        return variableReceive;
+    }
+
+    public void setVariableReceive(String variableReceive) {
+        this.variableReceive = variableReceive;
+    }
+
     @Override
     protected void doStart() throws Exception {
         // inject CamelContext on data format
diff --git a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java
index a976bf9ba52..8d9af8a709d 100644
--- a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java
+++ b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java
@@ -603,8 +603,14 @@ public class ModelParser extends BaseParser {
         }, outputExpressionNodeElementHandler(), noValueHandler());
     }
     protected MarshalDefinition doParseMarshalDefinition() throws IOException, XmlPullParserException {
-        return doParse(new MarshalDefinition(),
-            processorDefinitionAttributeHandler(), (def, key) -> {
+        return doParse(new MarshalDefinition(), (def, key, val) -> {
+            switch (key) {
+                case "variableReceive": def.setVariableReceive(val); break;
+                case "variableSend": def.setVariableSend(val); break;
+                default: return processorDefinitionAttributeHandler().accept(def, key, val);
+            }
+            return true;
+        }, (def, key) -> {
             DataFormatDefinition v = doParseDataFormatDefinitionRef(key);
             if (v != null) { 
                 def.setDataFormatType(v);
@@ -1593,11 +1599,13 @@ public class ModelParser extends BaseParser {
     }
     protected UnmarshalDefinition doParseUnmarshalDefinition() throws IOException, XmlPullParserException {
         return doParse(new UnmarshalDefinition(), (def, key, val) -> {
-            if ("allowNullBody".equals(key)) {
-                def.setAllowNullBody(val);
-                return true;
+            switch (key) {
+                case "allowNullBody": def.setAllowNullBody(val); break;
+                case "variableReceive": def.setVariableReceive(val); break;
+                case "variableSend": def.setVariableSend(val); break;
+                default: return processorDefinitionAttributeHandler().accept(def, key, val);
             }
-            return processorDefinitionAttributeHandler().accept(def, key, val);
+            return true;
         }, (def, key) -> {
             DataFormatDefinition v = doParseDataFormatDefinitionRef(key);
             if (v != null) { 
diff --git a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/out/ModelWriter.java b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/out/ModelWriter.java
index dafa67a7831..f87189f2681 100644
--- a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/out/ModelWriter.java
+++ b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/out/ModelWriter.java
@@ -1540,6 +1540,8 @@ public class ModelWriter extends BaseWriter {
             throws IOException {
         startElement(name);
         doWriteProcessorDefinitionAttributes(def);
+        doWriteAttribute("variableReceive", def.getVariableReceive());
+        doWriteAttribute("variableSend", def.getVariableSend());
         doWriteElement(null, def.getDataFormatType(), (n, v) -> {
             switch (v.getClass().getSimpleName()) {
                 case "ASN1DataFormat" -> doWriteASN1DataFormat("asn1", (ASN1DataFormat) def.getDataFormatType());
@@ -2493,6 +2495,8 @@ public class ModelWriter extends BaseWriter {
             throws IOException {
         startElement(name);
         doWriteProcessorDefinitionAttributes(def);
+        doWriteAttribute("variableReceive", def.getVariableReceive());
+        doWriteAttribute("variableSend", def.getVariableSend());
         doWriteAttribute("allowNullBody", def.getAllowNullBody());
         doWriteElement(null, def.getDataFormatType(), (n, v) -> {
             switch (v.getClass().getSimpleName()) {
diff --git a/core/camel-yaml-io/src/generated/java/org/apache/camel/yaml/out/ModelWriter.java b/core/camel-yaml-io/src/generated/java/org/apache/camel/yaml/out/ModelWriter.java
index 7d60741bd7b..fc9f63d4dfe 100644
--- a/core/camel-yaml-io/src/generated/java/org/apache/camel/yaml/out/ModelWriter.java
+++ b/core/camel-yaml-io/src/generated/java/org/apache/camel/yaml/out/ModelWriter.java
@@ -1540,6 +1540,8 @@ public class ModelWriter extends BaseWriter {
             throws IOException {
         startElement(name);
         doWriteProcessorDefinitionAttributes(def);
+        doWriteAttribute("variableReceive", def.getVariableReceive());
+        doWriteAttribute("variableSend", def.getVariableSend());
         doWriteElement(null, def.getDataFormatType(), (n, v) -> {
             switch (v.getClass().getSimpleName()) {
                 case "ASN1DataFormat" -> doWriteASN1DataFormat("asn1", (ASN1DataFormat) def.getDataFormatType());
@@ -2493,6 +2495,8 @@ public class ModelWriter extends BaseWriter {
             throws IOException {
         startElement(name);
         doWriteProcessorDefinitionAttributes(def);
+        doWriteAttribute("variableReceive", def.getVariableReceive());
+        doWriteAttribute("variableSend", def.getVariableSend());
         doWriteAttribute("allowNullBody", def.getAllowNullBody());
         doWriteElement(null, def.getDataFormatType(), (n, v) -> {
             switch (v.getClass().getSimpleName()) {