You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2018/08/08 11:59:47 UTC
[camel] branch camel-2.22.x updated: CAMEL-12638:
FluentProducerTemplate should be thread-safe (#2459)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-2.22.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-2.22.x by this push:
new 60a1b18 CAMEL-12638: FluentProducerTemplate should be thread-safe (#2459)
60a1b18 is described below
commit 60a1b189e0b29ddb2b151b29aa20ae5af3a0432c
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Aug 8 13:57:58 2018 +0200
CAMEL-12638: FluentProducerTemplate should be thread-safe (#2459)
CAMEL-12638: FluentProducerTemplate should be thread-safe.
---
.../builder/DefaultFluentProducerTemplate.java | 44 ++++++++++++----------
1 file changed, 25 insertions(+), 19 deletions(-)
diff --git a/camel-core/src/main/java/org/apache/camel/builder/DefaultFluentProducerTemplate.java b/camel-core/src/main/java/org/apache/camel/builder/DefaultFluentProducerTemplate.java
index b1af75d..b59af2f 100644
--- a/camel-core/src/main/java/org/apache/camel/builder/DefaultFluentProducerTemplate.java
+++ b/camel-core/src/main/java/org/apache/camel/builder/DefaultFluentProducerTemplate.java
@@ -39,10 +39,13 @@ import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ServiceHelper;
public class DefaultFluentProducerTemplate extends ServiceSupport implements FluentProducerTemplate {
+
+ // transient state of headers and body which needs to be thread local scoped to be thread-safe
+ private final ThreadLocal<Map<String, Object>> headers = new ThreadLocal<>();
+ private final ThreadLocal<Object> body = new ThreadLocal<>();
+
private final CamelContext context;
private final ClassValue<ConvertBodyProcessor> resultProcessors;
- private Map<String, Object> headers;
- private Object body;
private Optional<Consumer<ProducerTemplate>> templateCustomizer;
private Optional<Supplier<Exchange>> exchangeSupplier;
private Optional<Supplier<Processor>> processorSupplier;
@@ -133,43 +136,45 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
@Override
public FluentProducerTemplate withHeader(String key, Object value) {
- if (headers == null) {
- headers = new HashMap<>();
+ Map<String, Object> map = headers.get();
+ if (map == null) {
+ map = new HashMap<>();
+ headers.set(map);
}
- headers.put(key, value);
+ map.put(key, value);
return this;
}
@Override
public FluentProducerTemplate clearHeaders() {
- if (headers != null) {
- headers.clear();
- }
+ headers.remove();
return this;
}
@Override
public FluentProducerTemplate withBody(Object body) {
- this.body = body;
+ this.body.set(body);
return this;
}
@Override
public FluentProducerTemplate withBodyAs(Object body, Class<?> type) {
- this.body = type != null
+ Object b = type != null
? context.getTypeConverter().convertTo(type, body)
: body;
+ this.body.set(b);
+
return this;
}
@Override
public FluentProducerTemplate clearBody() {
- this.body = null;
+ body.remove();
return this;
}
@@ -265,17 +270,17 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
final Endpoint target = target();
Future<T> result;
- if (ObjectHelper.isNotEmpty(headers)) {
+ if (ObjectHelper.isNotEmpty(headers.get())) {
// Make a copy of the headers and body so that async processing won't
// be invalidated by subsequent reuse of the template
- final Map<String, Object> headersCopy = new HashMap<>(headers);
- final Object bodyCopy = body;
+ final Map<String, Object> headersCopy = new HashMap<>(headers.get());
+ final Object bodyCopy = body.get();
result = template().asyncRequestBodyAndHeaders(target, bodyCopy, headersCopy, type);
} else {
// Make a copy of the and body so that async processing won't be
// invalidated by subsequent reuse of the template
- final Object bodyCopy = body;
+ final Object bodyCopy = body.get();
result = template().asyncRequestBody(target, bodyCopy, type);
}
@@ -335,14 +340,14 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
private Processor defaultProcessor() {
return exchange -> {
- ObjectHelper.ifNotEmpty(headers, exchange.getIn().getHeaders()::putAll);
- ObjectHelper.ifNotEmpty(body, exchange.getIn()::setBody);
+ ObjectHelper.ifNotEmpty(headers.get(), exchange.getIn().getHeaders()::putAll);
+ ObjectHelper.ifNotEmpty(body.get(), exchange.getIn()::setBody);
};
}
private Processor defaultAsyncProcessor() {
- final Map<String, Object> headersCopy = ObjectHelper.isNotEmpty(this.headers) ? new HashMap<>(this.headers) : null;
- final Object bodyCopy = this.body;
+ final Map<String, Object> headersCopy = ObjectHelper.isNotEmpty(this.headers.get()) ? new HashMap<>(this.headers.get()) : null;
+ final Object bodyCopy = this.body.get();
return exchange -> {
ObjectHelper.ifNotEmpty(headersCopy, exchange.getIn().getHeaders()::putAll);
@@ -371,6 +376,7 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
@Override
protected void doStop() throws Exception {
+ clearAll();
ServiceHelper.stopService(template);
}
}