You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2024/01/02 14:50:18 UTC
(camel) 01/04: CAMEL-20289: camel-core - FluentProducerTemplate - Add withVariable and withProperty
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch fluent
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 61881f2eb207d575628c4f51d5ef8c0523b39ae4
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Jan 2 14:54:26 2024 +0100
CAMEL-20289: camel-core - FluentProducerTemplate - Add withVariable and withProperty
---
.../org/apache/camel/FluentProducerTemplate.java | 42 ++++++++
.../impl/engine/DefaultFluentProducerTemplate.java | 119 ++++++++++++++++++---
.../org/apache/camel/support/ExchangeHelper.java | 27 +++++
3 files changed, 172 insertions(+), 16 deletions(-)
diff --git a/core/camel-api/src/main/java/org/apache/camel/FluentProducerTemplate.java b/core/camel-api/src/main/java/org/apache/camel/FluentProducerTemplate.java
index e26f177f9a2..adf0a3b3f38 100644
--- a/core/camel-api/src/main/java/org/apache/camel/FluentProducerTemplate.java
+++ b/core/camel-api/src/main/java/org/apache/camel/FluentProducerTemplate.java
@@ -191,6 +191,48 @@ public interface FluentProducerTemplate extends Service {
*/
FluentProducerTemplate withHeader(String key, Object value);
+ /**
+ * Set the exchange properties
+ *
+ * <b>Important:</b> You can either only use either withExchange, or withProcessor or a combination of
+ * withBody/withHeaders to construct the message to be sent.
+ *
+ * @param properties the exchange properties
+ */
+ FluentProducerTemplate withExchangeProperties(Map<String, Object> properties);
+
+ /**
+ * Set the exchange property
+ *
+ * <b>Important:</b> You can either only use either withExchange, or withProcessor or a combination of
+ * withBody/withHeaders to construct the message to be sent.
+ *
+ * @param key the key of the exchange property
+ * @param value the value of the exchange property
+ */
+ FluentProducerTemplate withExchangeProperty(String key, Object value);
+
+ /**
+ * Set the variables
+ *
+ * <b>Important:</b> You can either only use either withExchange, or withProcessor or a combination of
+ * withBody/withHeaders to construct the message to be sent.
+ *
+ * @param variables the variables
+ */
+ FluentProducerTemplate withVariables(Map<String, Object> variables);
+
+ /**
+ * Set the exchange property
+ *
+ * <b>Important:</b> You can either only use either withExchange, or withProcessor or a combination of
+ * withBody/withHeaders to construct the message to be sent.
+ *
+ * @param key the key of the variable
+ * @param value the value of the variable
+ */
+ FluentProducerTemplate withVariable(String key, Object value);
+
/**
* Set the message body
*
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java
index e82381e50b5..851bf2ce845 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java
@@ -51,8 +51,10 @@ import org.apache.camel.util.ObjectHelper;
*/
public class DefaultFluentProducerTemplate extends ServiceSupport implements FluentProducerTemplate {
- // transient state of endpoint, headers and body which needs to be thread local scoped to be thread-safe
+ // transient state of endpoint, headers, exchange properties, variables, and body which needs to be thread local scoped to be thread-safe
private Map<String, Object> headers;
+ private Map<String, Object> exchangeProperties;
+ private Map<String, Object> variables;
private Object body;
private Supplier<Exchange> exchangeSupplier;
private Supplier<Processor> processorSupplier;
@@ -207,6 +209,74 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
return clone;
}
+ @Override
+ public FluentProducerTemplate withExchangeProperties(Map<String, Object> properties) {
+ DefaultFluentProducerTemplate clone = checkCloned();
+
+ if (clone.processorSupplier != null) {
+ throw new IllegalArgumentException("Cannot use both withExchangeProperties and withProcessor with FluentProducerTemplate");
+ }
+
+ Map<String, Object> map = clone.exchangeProperties;
+ if (map == null) {
+ map = new LinkedHashMap<>();
+ clone.exchangeProperties = map;
+ }
+ map.putAll(properties);
+ return clone;
+ }
+
+ @Override
+ public FluentProducerTemplate withExchangeProperty(String key, Object value) {
+ DefaultFluentProducerTemplate clone = checkCloned();
+
+ if (clone.processorSupplier != null) {
+ throw new IllegalArgumentException("Cannot use both withExchangeProperty and withProcessor with FluentProducerTemplate");
+ }
+
+ Map<String, Object> map = clone.exchangeProperties;
+ if (map == null) {
+ map = new LinkedHashMap<>();
+ clone.exchangeProperties = map;
+ }
+ map.put(key, value);
+ return clone;
+ }
+
+ @Override
+ public FluentProducerTemplate withVariables(Map<String, Object> variables) {
+ DefaultFluentProducerTemplate clone = checkCloned();
+
+ if (clone.processorSupplier != null) {
+ throw new IllegalArgumentException("Cannot use both withVariables and withProcessor with FluentProducerTemplate");
+ }
+
+ Map<String, Object> map = clone.variables;
+ if (map == null) {
+ map = new LinkedHashMap<>();
+ clone.variables = map;
+ }
+ map.putAll(variables);
+ return clone;
+ }
+
+ @Override
+ public FluentProducerTemplate withVariable(String key, Object value) {
+ DefaultFluentProducerTemplate clone = checkCloned();
+
+ if (clone.processorSupplier != null) {
+ throw new IllegalArgumentException("Cannot use both withVariable and withProcessor with FluentProducerTemplate");
+ }
+
+ Map<String, Object> map = clone.variables;
+ if (map == null) {
+ map = new LinkedHashMap<>();
+ clone.variables = map;
+ }
+ map.put(key, value);
+ return clone;
+ }
+
@Override
public FluentProducerTemplate withBody(Object body) {
DefaultFluentProducerTemplate clone = checkCloned();
@@ -387,21 +457,24 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
// Determine the target endpoint
final Endpoint target = clone.target();
- Future<T> result;
- if (ObjectHelper.isNotEmpty(clone.headers)) {
- // Make a copy of the headers and body so that async processing won't
- // be invalidated by subsequent reuse of the template
- final Map<String, Object> headersCopy = new HashMap<>(clone.headers);
- final Object bodyCopy = clone.body;
-
- result = clone.template().asyncRequestBodyAndHeaders(target, bodyCopy, headersCopy, type);
- } else {
- // Make a copy of the and body so that async processing won't be
- // invalidated by subsequent reuse of the template
- final Object bodyCopy = clone.body;
-
- result = clone.template().asyncRequestBody(target, bodyCopy, type);
- }
+ Future<T> result =
+ clone.template().asyncSend(target, exchange -> {
+ // Make a copy of the headers and body so that async processing won't
+ // be invalidated by subsequent reuse of the template
+ Object bodyCopy = clone.body;
+
+ exchange.setPattern(ExchangePattern.InOut);
+ exchange.getMessage().setBody(bodyCopy);
+ if (clone.headers != null) {
+ exchange.getMessage().setHeaders(new HashMap<>(clone.headers));
+ }
+ if (clone.exchangeProperties != null) {
+ exchange.getProperties().putAll(clone.exchangeProperties);
+ }
+ if (clone.variables != null) {
+ clone.variables.forEach((k, v) -> ExchangeHelper.setVariable(exchange, k, v));
+ }
+ }).thenApply(answer -> answer.getContext().getTypeConverter().convertTo(type, answer));
// reset cloned flag so when we use it again it has to set values again
cloned = false;
@@ -537,22 +610,36 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
if (headers != null) {
exchange.getIn().getHeaders().putAll(headers);
}
+ if (exchangeProperties != null) {
+ exchange.getProperties().putAll(exchangeProperties);
+ }
if (body != null) {
exchange.getIn().setBody(body);
}
+ if (variables != null) {
+ variables.forEach((k, v) -> ExchangeHelper.setVariable(exchange, k, v));
+ }
};
}
private Processor defaultAsyncProcessor() {
final Map<String, Object> headersCopy = ObjectHelper.isNotEmpty(this.headers) ? new HashMap<>(this.headers) : null;
+ final Map<String, Object> propertiesCopy = ObjectHelper.isNotEmpty(this.exchangeProperties) ? new HashMap<>(this.exchangeProperties) : null;
+ final Map<String, Object> variablesCopy = ObjectHelper.isNotEmpty(this.variables) ? new HashMap<>(this.variables) : null;
final Object bodyCopy = this.body;
return exchange -> {
if (headersCopy != null) {
exchange.getIn().getHeaders().putAll(headersCopy);
}
+ if (propertiesCopy != null) {
+ exchange.getProperties().putAll(propertiesCopy);
+ }
if (bodyCopy != null) {
exchange.getIn().setBody(bodyCopy);
}
+ if (variablesCopy != null) {
+ variablesCopy.forEach((k, v) -> ExchangeHelper.setVariable(exchange, k, v));
+ }
};
}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
index 9f45cd44431..004d0dacb99 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
@@ -50,6 +50,8 @@ import org.apache.camel.TypeConversionException;
import org.apache.camel.WrappedFile;
import org.apache.camel.spi.NormalizedEndpointUri;
import org.apache.camel.spi.UnitOfWork;
+import org.apache.camel.spi.VariableRepository;
+import org.apache.camel.spi.VariableRepositoryFactory;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.Scanner;
@@ -1070,4 +1072,29 @@ public final class ExchangeHelper {
exchange.getOut().setBody(body);
}
}
+
+ /**
+ * Sets the variable
+ *
+ * @param exchange the exchange
+ * @param name the variable name. Can be prefixed with repo-id:name to lookup the variable from a specific
+ * repository. If no repo-id is provided, then the variable is set on the exchange
+ * @param value the value of the variable
+ */
+ public static void setVariable(Exchange exchange, String name, Object value) {
+ String id = StringHelper.before(name, ":");
+ if (id != null) {
+ VariableRepositoryFactory factory = exchange.getContext().getCamelContextExtension().getContextPlugin(VariableRepositoryFactory.class);
+ VariableRepository repo = factory.getVariableRepository(id);
+ if (repo != null) {
+ name = StringHelper.after(name, ":");
+ repo.setVariable(name, value);
+ } else {
+ exchange.setException(
+ new IllegalArgumentException("VariableRepository with id: " + id + " does not exists"));
+ }
+ } else {
+ exchange.setVariable(name, value);
+ }
+ }
}