You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/08/01 09:02:07 UTC

[camel] branch master updated: CAMEL-12968: Fix FluentProducerTemplate to be thread-safe when you also set endpoints and other states.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new ce44147  CAMEL-12968: Fix FluentProducerTemplate to be thread-safe when you also set endpoints and other states.
ce44147 is described below

commit ce441479b98eedd4371d011c5230fc5b1acd0e79
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Aug 1 11:01:47 2019 +0200

    CAMEL-12968: Fix FluentProducerTemplate to be thread-safe when you also set endpoints and other states.
---
 .../impl/engine/DefaultFluentProducerTemplate.java | 82 +++++++++++++---------
 1 file changed, 47 insertions(+), 35 deletions(-)

diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java
index 0d8c579..9a19e65 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java
@@ -17,8 +17,8 @@
 package org.apache.camel.impl.engine;
 
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.Future;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
@@ -40,29 +40,24 @@ import org.apache.camel.util.ObjectHelper;
 
 public class DefaultFluentProducerTemplate extends ServiceSupport implements FluentProducerTemplate {
 
-    // transient state of headers and body which needs to be thread local scoped to be thread-safe
+    // transient state of endpoint, headers and body which needs to be thread local scoped to be thread-safe
     private final ThreadLocal<Map<String, Object>> headers = new ThreadLocal<>();
     private final ThreadLocal<Object> body = new ThreadLocal<>();
+    private final ThreadLocal<Endpoint> endpoint = new ThreadLocal<>();
+    private final ThreadLocal<Supplier<Exchange>> exchangeSupplier = new ThreadLocal<>();
+    private final ThreadLocal<Supplier<Processor>> processorSupplier = new ThreadLocal<>();
+    private final ThreadLocal<Consumer<ProducerTemplate>> templateCustomizer = new ThreadLocal<>();
 
     private final CamelContext context;
     private final ClassValue<ConvertBodyProcessor> resultProcessors;
-    private Optional<Consumer<ProducerTemplate>> templateCustomizer;
-    private Optional<Supplier<Exchange>> exchangeSupplier;
-    private Optional<Supplier<Processor>> processorSupplier;
-    private Optional<Endpoint> endpoint;
-    private Optional<Endpoint> defaultEndpoint;
+    private Endpoint defaultEndpoint;
     private int maximumCacheSize;
     private boolean eventNotifierEnabled;
     private volatile ProducerTemplate template;
 
     public DefaultFluentProducerTemplate(CamelContext context) {
         this.context = context;
-        this.endpoint = Optional.empty();
-        this.defaultEndpoint = Optional.empty();
         this.eventNotifierEnabled = true;
-        this.templateCustomizer = Optional.empty();
-        this.exchangeSupplier = Optional.empty();
-        this.processorSupplier = Optional.empty();
         this.resultProcessors = new ClassValue<ConvertBodyProcessor>() {
             @Override
             protected ConvertBodyProcessor computeValue(Class<?> type) {
@@ -98,12 +93,12 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
 
     @Override
     public Endpoint getDefaultEndpoint() {
-        return defaultEndpoint.orElse(null);
+        return defaultEndpoint;
     }
 
     @Override
     public void setDefaultEndpoint(Endpoint defaultEndpoint) {
-        this.defaultEndpoint = Optional.ofNullable(defaultEndpoint);
+        this.defaultEndpoint = defaultEndpoint;
     }
 
     @Override
@@ -138,7 +133,7 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
     public FluentProducerTemplate withHeader(String key, Object value) {
         Map<String, Object> map = headers.get();
         if (map == null) {
-            map = new HashMap<>();
+            map = new LinkedHashMap<>();
             headers.set(map);
         }
 
@@ -181,7 +176,7 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
 
     @Override
     public FluentProducerTemplate withTemplateCustomizer(final Consumer<ProducerTemplate> templateCustomizer) {
-        this.templateCustomizer = Optional.of(templateCustomizer);
+        this.templateCustomizer.set(templateCustomizer);
         return this;
     }
 
@@ -192,7 +187,7 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
 
     @Override
     public FluentProducerTemplate withExchange(final Supplier<Exchange> exchangeSupplier) {
-        this.exchangeSupplier = Optional.of(exchangeSupplier);
+        this.exchangeSupplier.set(exchangeSupplier);
         return this;
     }
 
@@ -203,7 +198,7 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
 
     @Override
     public FluentProducerTemplate withProcessor(final Supplier<Processor> processorSupplier) {
-        this.processorSupplier = Optional.of(processorSupplier);
+        this.processorSupplier.set(processorSupplier);
         return this;
     }
 
@@ -214,7 +209,7 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
 
     @Override
     public FluentProducerTemplate to(Endpoint endpoint) {
-        this.endpoint = Optional.of(endpoint);
+        this.endpoint.set(endpoint);
         return this;
     }
 
@@ -230,7 +225,7 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
     @Override
     @SuppressWarnings("unchecked")
     public <T> T request(Class<T> type) throws CamelExecutionException {
-        if (exchangeSupplier.isPresent()) {
+        if (exchangeSupplier.get() != null) {
             throw new IllegalArgumentException("withExchange not supported on FluentProducerTemplate.request method. Use send method instead.");
         }
 
@@ -238,19 +233,19 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
         final Endpoint target = target();
 
         // Create the default processor if not provided.
-        final Supplier<Processor> processorSupplier = this.processorSupplier.orElse(() -> defaultProcessor());
+        final Processor processorSupplier = this.processorSupplier.get() != null ? this.processorSupplier.get().get() : defaultProcessor();
 
         T result;
         if (type == Exchange.class) {
-            result = (T)template().request(target, processorSupplier.get());
+            result = (T)template().request(target, processorSupplier);
         } else if (type == Message.class) {
-            Exchange exchange = template().request(target, processorSupplier.get());
+            Exchange exchange = template().request(target, processorSupplier);
             result = (T)exchange.getMessage();
         } else {
             Exchange exchange = template().send(
                 target,
                 ExchangePattern.InOut,
-                processorSupplier.get(),
+                processorSupplier,
                 resultProcessors.get(type)
             );
 
@@ -301,9 +296,13 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
         // Determine the target endpoint
         final Endpoint target = target();
 
-        return exchangeSupplier.isPresent()
-            ? template().send(target, exchangeSupplier.get().get())
-            : template().send(target, processorSupplier.orElse(() -> defaultProcessor()).get());
+        Exchange exchange = exchangeSupplier.get() != null ? exchangeSupplier.get().get() : null;
+        if (exchange != null) {
+            return template().send(target, exchange);
+        } else {
+            Processor processor = processorSupplier.get() != null ? processorSupplier.get().get() : defaultProcessor();
+            return template().send(target, processor);
+        }
     }
 
     @Override
@@ -311,9 +310,13 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
         // Determine the target endpoint
         final Endpoint target = target();
 
-        return exchangeSupplier.isPresent()
-            ? template().asyncSend(target, exchangeSupplier.get().get())
-            : template().asyncSend(target, processorSupplier.orElse(() -> defaultAsyncProcessor()).get());
+        Exchange exchange = exchangeSupplier.get() != null ? exchangeSupplier.get().get() : null;
+        if (exchange != null) {
+            return template().asyncSend(target, exchange);
+        } else {
+            Processor processor = processorSupplier.get() != null ? processorSupplier.get().get() : defaultAsyncProcessor();
+            return template().asyncSend(target, processor);
+        }
     }
 
     // ************************
@@ -334,9 +337,13 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
 
         if (template == null) {
             template = context.createProducerTemplate(maximumCacheSize);
-            defaultEndpoint.ifPresent(template::setDefaultEndpoint);
+            if (defaultEndpoint != null) {
+                template.setDefaultEndpoint(defaultEndpoint);
+            }
             template.setEventNotifierEnabled(eventNotifierEnabled);
-            templateCustomizer.ifPresent(tc -> tc.accept(template));
+            if (templateCustomizer.get() != null) {
+                templateCustomizer.get().accept(template);
+            }
         }
 
         return template;
@@ -360,11 +367,11 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
     }
 
     private Endpoint target() {
-        if (endpoint.isPresent()) {
+        if (endpoint.get() != null) {
             return endpoint.get();
         }
-        if (defaultEndpoint.isPresent()) {
-            return defaultEndpoint.get();
+        if (defaultEndpoint != null) {
+            return defaultEndpoint;
         }
 
         throw new IllegalArgumentException("No endpoint configured on FluentProducerTemplate. You can configure an endpoint with to(uri)");
@@ -381,6 +388,11 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
     @Override
     protected void doStop() throws Exception {
         clearAll();
+        this.endpoint.remove();
+        this.exchangeSupplier.remove();
+        this.processorSupplier.remove();
+        this.templateCustomizer.remove();
+
         ServiceHelper.stopService(template);
     }
 }