You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2018/08/08 11:59:47 UTC

[camel] branch camel-2.22.x updated: CAMEL-12638: FluentProducerTemplate should be thread-safe (#2459)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-2.22.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-2.22.x by this push:
     new 60a1b18   CAMEL-12638: FluentProducerTemplate should be thread-safe (#2459)
60a1b18 is described below

commit 60a1b189e0b29ddb2b151b29aa20ae5af3a0432c
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Aug 8 13:57:58 2018 +0200

     CAMEL-12638: FluentProducerTemplate should be thread-safe (#2459)
    
    CAMEL-12638: FluentProducerTemplate should be thread-safe.
---
 .../builder/DefaultFluentProducerTemplate.java     | 44 ++++++++++++----------
 1 file changed, 25 insertions(+), 19 deletions(-)

diff --git a/camel-core/src/main/java/org/apache/camel/builder/DefaultFluentProducerTemplate.java b/camel-core/src/main/java/org/apache/camel/builder/DefaultFluentProducerTemplate.java
index b1af75d..b59af2f 100644
--- a/camel-core/src/main/java/org/apache/camel/builder/DefaultFluentProducerTemplate.java
+++ b/camel-core/src/main/java/org/apache/camel/builder/DefaultFluentProducerTemplate.java
@@ -39,10 +39,13 @@ import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ServiceHelper;
 
 public class DefaultFluentProducerTemplate extends ServiceSupport implements FluentProducerTemplate {
+
+    // transient state of headers and body which needs to be thread local scoped to be thread-safe
+    private final ThreadLocal<Map<String, Object>> headers = new ThreadLocal<>();
+    private final ThreadLocal<Object> body = new ThreadLocal<>();
+
     private final CamelContext context;
     private final ClassValue<ConvertBodyProcessor> resultProcessors;
-    private Map<String, Object> headers;
-    private Object body;
     private Optional<Consumer<ProducerTemplate>> templateCustomizer;
     private Optional<Supplier<Exchange>> exchangeSupplier;
     private Optional<Supplier<Processor>> processorSupplier;
@@ -133,43 +136,45 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
 
     @Override
     public FluentProducerTemplate withHeader(String key, Object value) {
-        if (headers == null) {
-            headers = new HashMap<>();
+        Map<String, Object> map = headers.get();
+        if (map == null) {
+            map = new HashMap<>();
+            headers.set(map);
         }
 
-        headers.put(key, value);
+        map.put(key, value);
 
         return this;
     }
 
     @Override
     public FluentProducerTemplate clearHeaders() {
-        if (headers != null) {
-            headers.clear();
-        }
+        headers.remove();
 
         return this;
     }
 
     @Override
     public FluentProducerTemplate withBody(Object body) {
-        this.body = body;
+        this.body.set(body);
 
         return this;
     }
 
     @Override
     public FluentProducerTemplate withBodyAs(Object body, Class<?> type) {
-        this.body = type != null
+        Object b = type != null
             ? context.getTypeConverter().convertTo(type, body)
             : body;
 
+        this.body.set(b);
+
         return this;
     }
 
     @Override
     public FluentProducerTemplate clearBody() {
-        this.body = null;
+        body.remove();
 
         return this;
     }
@@ -265,17 +270,17 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
         final Endpoint target = target();
 
         Future<T> result;
-        if (ObjectHelper.isNotEmpty(headers)) {
+        if (ObjectHelper.isNotEmpty(headers.get())) {
             // Make a copy of the headers and body so that async processing won't
             // be invalidated by subsequent reuse of the template
-            final Map<String, Object> headersCopy = new HashMap<>(headers);
-            final Object bodyCopy = body;
+            final Map<String, Object> headersCopy = new HashMap<>(headers.get());
+            final Object bodyCopy = body.get();
 
             result = template().asyncRequestBodyAndHeaders(target, bodyCopy, headersCopy, type);
         } else {
             // Make a copy of the and body so that async processing won't be
             // invalidated by subsequent reuse of the template
-            final Object bodyCopy = body;
+            final Object bodyCopy = body.get();
 
             result = template().asyncRequestBody(target, bodyCopy, type);
         }
@@ -335,14 +340,14 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
 
     private Processor defaultProcessor() {
         return exchange -> {
-            ObjectHelper.ifNotEmpty(headers, exchange.getIn().getHeaders()::putAll);
-            ObjectHelper.ifNotEmpty(body, exchange.getIn()::setBody);
+            ObjectHelper.ifNotEmpty(headers.get(), exchange.getIn().getHeaders()::putAll);
+            ObjectHelper.ifNotEmpty(body.get(), exchange.getIn()::setBody);
         };
     }
 
     private Processor defaultAsyncProcessor() {
-        final Map<String, Object> headersCopy = ObjectHelper.isNotEmpty(this.headers) ? new HashMap<>(this.headers) : null;
-        final Object bodyCopy = this.body;
+        final Map<String, Object> headersCopy = ObjectHelper.isNotEmpty(this.headers.get()) ? new HashMap<>(this.headers.get()) : null;
+        final Object bodyCopy = this.body.get();
 
         return exchange -> {
             ObjectHelper.ifNotEmpty(headersCopy, exchange.getIn().getHeaders()::putAll);
@@ -371,6 +376,7 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
 
     @Override
     protected void doStop() throws Exception {
+        clearAll();
         ServiceHelper.stopService(template);
     }
 }