You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2024/01/02 14:50:18 UTC

(camel) 01/04: CAMEL-20289: camel-core - FluentProducerTemplate - Add withVariable and withProperty

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch fluent
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 61881f2eb207d575628c4f51d5ef8c0523b39ae4
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Jan 2 14:54:26 2024 +0100

    CAMEL-20289: camel-core - FluentProducerTemplate - Add withVariable and withProperty
---
 .../org/apache/camel/FluentProducerTemplate.java   |  42 ++++++++
 .../impl/engine/DefaultFluentProducerTemplate.java | 119 ++++++++++++++++++---
 .../org/apache/camel/support/ExchangeHelper.java   |  27 +++++
 3 files changed, 172 insertions(+), 16 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/FluentProducerTemplate.java b/core/camel-api/src/main/java/org/apache/camel/FluentProducerTemplate.java
index e26f177f9a2..adf0a3b3f38 100644
--- a/core/camel-api/src/main/java/org/apache/camel/FluentProducerTemplate.java
+++ b/core/camel-api/src/main/java/org/apache/camel/FluentProducerTemplate.java
@@ -191,6 +191,48 @@ public interface FluentProducerTemplate extends Service {
      */
     FluentProducerTemplate withHeader(String key, Object value);
 
+    /**
+     * Set the exchange properties
+     *
+     * <b>Important:</b> You can either only use either withExchange, or withProcessor or a combination of
+     * withBody/withHeaders to construct the message to be sent.
+     *
+     * @param properties the exchange properties
+     */
+    FluentProducerTemplate withExchangeProperties(Map<String, Object> properties);
+
+    /**
+     * Set the exchange property
+     *
+     * <b>Important:</b> You can either only use either withExchange, or withProcessor or a combination of
+     * withBody/withHeaders to construct the message to be sent.
+     *
+     * @param key   the key of the exchange property
+     * @param value the value of the exchange property
+     */
+    FluentProducerTemplate withExchangeProperty(String key, Object value);
+
+    /**
+     * Set the variables
+     *
+     * <b>Important:</b> You can either only use either withExchange, or withProcessor or a combination of
+     * withBody/withHeaders to construct the message to be sent.
+     *
+     * @param variables the variables
+     */
+    FluentProducerTemplate withVariables(Map<String, Object> variables);
+
+    /**
+     * Set the exchange property
+     *
+     * <b>Important:</b> You can either only use either withExchange, or withProcessor or a combination of
+     * withBody/withHeaders to construct the message to be sent.
+     *
+     * @param key   the key of the variable
+     * @param value the value of the variable
+     */
+    FluentProducerTemplate withVariable(String key, Object value);
+
     /**
      * Set the message body
      *
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java
index e82381e50b5..851bf2ce845 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java
@@ -51,8 +51,10 @@ import org.apache.camel.util.ObjectHelper;
  */
 public class DefaultFluentProducerTemplate extends ServiceSupport implements FluentProducerTemplate {
 
-    // transient state of endpoint, headers and body which needs to be thread local scoped to be thread-safe
+    // transient state of endpoint, headers, exchange properties, variables, and body which needs to be thread local scoped to be thread-safe
     private Map<String, Object> headers;
+    private Map<String, Object> exchangeProperties;
+    private Map<String, Object> variables;
     private Object body;
     private Supplier<Exchange> exchangeSupplier;
     private Supplier<Processor> processorSupplier;
@@ -207,6 +209,74 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
         return clone;
     }
 
+    @Override
+    public FluentProducerTemplate withExchangeProperties(Map<String, Object> properties) {
+        DefaultFluentProducerTemplate clone = checkCloned();
+
+        if (clone.processorSupplier != null) {
+            throw new IllegalArgumentException("Cannot use both withExchangeProperties and withProcessor with FluentProducerTemplate");
+        }
+
+        Map<String, Object> map = clone.exchangeProperties;
+        if (map == null) {
+            map = new LinkedHashMap<>();
+            clone.exchangeProperties = map;
+        }
+        map.putAll(properties);
+        return clone;
+    }
+
+    @Override
+    public FluentProducerTemplate withExchangeProperty(String key, Object value) {
+        DefaultFluentProducerTemplate clone = checkCloned();
+
+        if (clone.processorSupplier != null) {
+            throw new IllegalArgumentException("Cannot use both withExchangeProperty and withProcessor with FluentProducerTemplate");
+        }
+
+        Map<String, Object> map = clone.exchangeProperties;
+        if (map == null) {
+            map = new LinkedHashMap<>();
+            clone.exchangeProperties = map;
+        }
+        map.put(key, value);
+        return clone;
+    }
+
+    @Override
+    public FluentProducerTemplate withVariables(Map<String, Object> variables) {
+        DefaultFluentProducerTemplate clone = checkCloned();
+
+        if (clone.processorSupplier != null) {
+            throw new IllegalArgumentException("Cannot use both withVariables and withProcessor with FluentProducerTemplate");
+        }
+
+        Map<String, Object> map = clone.variables;
+        if (map == null) {
+            map = new LinkedHashMap<>();
+            clone.variables = map;
+        }
+        map.putAll(variables);
+        return clone;
+    }
+
+    @Override
+    public FluentProducerTemplate withVariable(String key, Object value) {
+        DefaultFluentProducerTemplate clone = checkCloned();
+
+        if (clone.processorSupplier != null) {
+            throw new IllegalArgumentException("Cannot use both withVariable and withProcessor with FluentProducerTemplate");
+        }
+
+        Map<String, Object> map = clone.variables;
+        if (map == null) {
+            map = new LinkedHashMap<>();
+            clone.variables = map;
+        }
+        map.put(key, value);
+        return clone;
+    }
+
     @Override
     public FluentProducerTemplate withBody(Object body) {
         DefaultFluentProducerTemplate clone = checkCloned();
@@ -387,21 +457,24 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
         // Determine the target endpoint
         final Endpoint target = clone.target();
 
-        Future<T> result;
-        if (ObjectHelper.isNotEmpty(clone.headers)) {
-            // Make a copy of the headers and body so that async processing won't
-            // be invalidated by subsequent reuse of the template
-            final Map<String, Object> headersCopy = new HashMap<>(clone.headers);
-            final Object bodyCopy = clone.body;
-
-            result = clone.template().asyncRequestBodyAndHeaders(target, bodyCopy, headersCopy, type);
-        } else {
-            // Make a copy of the and body so that async processing won't be
-            // invalidated by subsequent reuse of the template
-            final Object bodyCopy = clone.body;
-
-            result = clone.template().asyncRequestBody(target, bodyCopy, type);
-        }
+        Future<T> result =
+                clone.template().asyncSend(target, exchange -> {
+                    // Make a copy of the headers and body so that async processing won't
+                    // be invalidated by subsequent reuse of the template
+                    Object bodyCopy = clone.body;
+
+                    exchange.setPattern(ExchangePattern.InOut);
+                    exchange.getMessage().setBody(bodyCopy);
+                    if (clone.headers != null) {
+                        exchange.getMessage().setHeaders(new HashMap<>(clone.headers));
+                    }
+                    if (clone.exchangeProperties != null) {
+                        exchange.getProperties().putAll(clone.exchangeProperties);
+                    }
+                    if (clone.variables != null) {
+                        clone.variables.forEach((k, v) -> ExchangeHelper.setVariable(exchange, k, v));
+                    }
+                }).thenApply(answer -> answer.getContext().getTypeConverter().convertTo(type, answer));
 
         // reset cloned flag so when we use it again it has to set values again
         cloned = false;
@@ -537,22 +610,36 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
             if (headers != null) {
                 exchange.getIn().getHeaders().putAll(headers);
             }
+            if (exchangeProperties != null) {
+                exchange.getProperties().putAll(exchangeProperties);
+            }
             if (body != null) {
                 exchange.getIn().setBody(body);
             }
+            if (variables != null) {
+                variables.forEach((k, v) -> ExchangeHelper.setVariable(exchange, k, v));
+            }
         };
     }
 
     private Processor defaultAsyncProcessor() {
         final Map<String, Object> headersCopy = ObjectHelper.isNotEmpty(this.headers) ? new HashMap<>(this.headers) : null;
+        final Map<String, Object> propertiesCopy = ObjectHelper.isNotEmpty(this.exchangeProperties) ? new HashMap<>(this.exchangeProperties) : null;
+        final Map<String, Object> variablesCopy = ObjectHelper.isNotEmpty(this.variables) ? new HashMap<>(this.variables) : null;
         final Object bodyCopy = this.body;
         return exchange -> {
             if (headersCopy != null) {
                 exchange.getIn().getHeaders().putAll(headersCopy);
             }
+            if (propertiesCopy != null) {
+                exchange.getProperties().putAll(propertiesCopy);
+            }
             if (bodyCopy != null) {
                 exchange.getIn().setBody(bodyCopy);
             }
+            if (variablesCopy != null) {
+                variablesCopy.forEach((k, v) -> ExchangeHelper.setVariable(exchange, k, v));
+            }
         };
     }
 
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
index 9f45cd44431..004d0dacb99 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
@@ -50,6 +50,8 @@ import org.apache.camel.TypeConversionException;
 import org.apache.camel.WrappedFile;
 import org.apache.camel.spi.NormalizedEndpointUri;
 import org.apache.camel.spi.UnitOfWork;
+import org.apache.camel.spi.VariableRepository;
+import org.apache.camel.spi.VariableRepositoryFactory;
 import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.Scanner;
@@ -1070,4 +1072,29 @@ public final class ExchangeHelper {
             exchange.getOut().setBody(body);
         }
     }
+
+    /**
+     * Sets the variable
+     *
+     * @param exchange  the exchange
+     * @param name  the variable name. Can be prefixed with repo-id:name to lookup the variable from a specific
+     *              repository. If no repo-id is provided, then the variable is set on the exchange
+     * @param value the value of the variable
+     */
+    public static void setVariable(Exchange exchange, String name, Object value) {
+        String id = StringHelper.before(name, ":");
+        if (id != null) {
+            VariableRepositoryFactory factory = exchange.getContext().getCamelContextExtension().getContextPlugin(VariableRepositoryFactory.class);
+            VariableRepository repo = factory.getVariableRepository(id);
+            if (repo != null) {
+                name = StringHelper.after(name, ":");
+                repo.setVariable(name, value);
+            } else {
+                exchange.setException(
+                        new IllegalArgumentException("VariableRepository with id: " + id + " does not exists"));
+            }
+        } else {
+            exchange.setVariable(name, value);
+        }
+    }
 }