You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/08/01 09:02:07 UTC
[camel] branch master updated: CAMEL-12968: Fix
FluentProducerTemplate to be thread-safe when you also set endpoints and
other states.
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new ce44147 CAMEL-12968: Fix FluentProducerTemplate to be thread-safe when you also set endpoints and other states.
ce44147 is described below
commit ce441479b98eedd4371d011c5230fc5b1acd0e79
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Aug 1 11:01:47 2019 +0200
CAMEL-12968: Fix FluentProducerTemplate to be thread-safe when you also set endpoints and other states.
---
.../impl/engine/DefaultFluentProducerTemplate.java | 82 +++++++++++++---------
1 file changed, 47 insertions(+), 35 deletions(-)
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java
index 0d8c579..9a19e65 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java
@@ -17,8 +17,8 @@
package org.apache.camel.impl.engine;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
-import java.util.Optional;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.function.Supplier;
@@ -40,29 +40,24 @@ import org.apache.camel.util.ObjectHelper;
public class DefaultFluentProducerTemplate extends ServiceSupport implements FluentProducerTemplate {
- // transient state of headers and body which needs to be thread local scoped to be thread-safe
+ // transient state of endpoint, headers and body which needs to be thread local scoped to be thread-safe
private final ThreadLocal<Map<String, Object>> headers = new ThreadLocal<>();
private final ThreadLocal<Object> body = new ThreadLocal<>();
+ private final ThreadLocal<Endpoint> endpoint = new ThreadLocal<>();
+ private final ThreadLocal<Supplier<Exchange>> exchangeSupplier = new ThreadLocal<>();
+ private final ThreadLocal<Supplier<Processor>> processorSupplier = new ThreadLocal<>();
+ private final ThreadLocal<Consumer<ProducerTemplate>> templateCustomizer = new ThreadLocal<>();
private final CamelContext context;
private final ClassValue<ConvertBodyProcessor> resultProcessors;
- private Optional<Consumer<ProducerTemplate>> templateCustomizer;
- private Optional<Supplier<Exchange>> exchangeSupplier;
- private Optional<Supplier<Processor>> processorSupplier;
- private Optional<Endpoint> endpoint;
- private Optional<Endpoint> defaultEndpoint;
+ private Endpoint defaultEndpoint;
private int maximumCacheSize;
private boolean eventNotifierEnabled;
private volatile ProducerTemplate template;
public DefaultFluentProducerTemplate(CamelContext context) {
this.context = context;
- this.endpoint = Optional.empty();
- this.defaultEndpoint = Optional.empty();
this.eventNotifierEnabled = true;
- this.templateCustomizer = Optional.empty();
- this.exchangeSupplier = Optional.empty();
- this.processorSupplier = Optional.empty();
this.resultProcessors = new ClassValue<ConvertBodyProcessor>() {
@Override
protected ConvertBodyProcessor computeValue(Class<?> type) {
@@ -98,12 +93,12 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
@Override
public Endpoint getDefaultEndpoint() {
- return defaultEndpoint.orElse(null);
+ return defaultEndpoint;
}
@Override
public void setDefaultEndpoint(Endpoint defaultEndpoint) {
- this.defaultEndpoint = Optional.ofNullable(defaultEndpoint);
+ this.defaultEndpoint = defaultEndpoint;
}
@Override
@@ -138,7 +133,7 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
public FluentProducerTemplate withHeader(String key, Object value) {
Map<String, Object> map = headers.get();
if (map == null) {
- map = new HashMap<>();
+ map = new LinkedHashMap<>();
headers.set(map);
}
@@ -181,7 +176,7 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
@Override
public FluentProducerTemplate withTemplateCustomizer(final Consumer<ProducerTemplate> templateCustomizer) {
- this.templateCustomizer = Optional.of(templateCustomizer);
+ this.templateCustomizer.set(templateCustomizer);
return this;
}
@@ -192,7 +187,7 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
@Override
public FluentProducerTemplate withExchange(final Supplier<Exchange> exchangeSupplier) {
- this.exchangeSupplier = Optional.of(exchangeSupplier);
+ this.exchangeSupplier.set(exchangeSupplier);
return this;
}
@@ -203,7 +198,7 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
@Override
public FluentProducerTemplate withProcessor(final Supplier<Processor> processorSupplier) {
- this.processorSupplier = Optional.of(processorSupplier);
+ this.processorSupplier.set(processorSupplier);
return this;
}
@@ -214,7 +209,7 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
@Override
public FluentProducerTemplate to(Endpoint endpoint) {
- this.endpoint = Optional.of(endpoint);
+ this.endpoint.set(endpoint);
return this;
}
@@ -230,7 +225,7 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
@Override
@SuppressWarnings("unchecked")
public <T> T request(Class<T> type) throws CamelExecutionException {
- if (exchangeSupplier.isPresent()) {
+ if (exchangeSupplier.get() != null) {
throw new IllegalArgumentException("withExchange not supported on FluentProducerTemplate.request method. Use send method instead.");
}
@@ -238,19 +233,19 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
final Endpoint target = target();
// Create the default processor if not provided.
- final Supplier<Processor> processorSupplier = this.processorSupplier.orElse(() -> defaultProcessor());
+ final Processor processorSupplier = this.processorSupplier.get() != null ? this.processorSupplier.get().get() : defaultProcessor();
T result;
if (type == Exchange.class) {
- result = (T)template().request(target, processorSupplier.get());
+ result = (T)template().request(target, processorSupplier);
} else if (type == Message.class) {
- Exchange exchange = template().request(target, processorSupplier.get());
+ Exchange exchange = template().request(target, processorSupplier);
result = (T)exchange.getMessage();
} else {
Exchange exchange = template().send(
target,
ExchangePattern.InOut,
- processorSupplier.get(),
+ processorSupplier,
resultProcessors.get(type)
);
@@ -301,9 +296,13 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
// Determine the target endpoint
final Endpoint target = target();
- return exchangeSupplier.isPresent()
- ? template().send(target, exchangeSupplier.get().get())
- : template().send(target, processorSupplier.orElse(() -> defaultProcessor()).get());
+ Exchange exchange = exchangeSupplier.get() != null ? exchangeSupplier.get().get() : null;
+ if (exchange != null) {
+ return template().send(target, exchange);
+ } else {
+ Processor processor = processorSupplier.get() != null ? processorSupplier.get().get() : defaultProcessor();
+ return template().send(target, processor);
+ }
}
@Override
@@ -311,9 +310,13 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
// Determine the target endpoint
final Endpoint target = target();
- return exchangeSupplier.isPresent()
- ? template().asyncSend(target, exchangeSupplier.get().get())
- : template().asyncSend(target, processorSupplier.orElse(() -> defaultAsyncProcessor()).get());
+ Exchange exchange = exchangeSupplier.get() != null ? exchangeSupplier.get().get() : null;
+ if (exchange != null) {
+ return template().asyncSend(target, exchange);
+ } else {
+ Processor processor = processorSupplier.get() != null ? processorSupplier.get().get() : defaultAsyncProcessor();
+ return template().asyncSend(target, processor);
+ }
}
// ************************
@@ -334,9 +337,13 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
if (template == null) {
template = context.createProducerTemplate(maximumCacheSize);
- defaultEndpoint.ifPresent(template::setDefaultEndpoint);
+ if (defaultEndpoint != null) {
+ template.setDefaultEndpoint(defaultEndpoint);
+ }
template.setEventNotifierEnabled(eventNotifierEnabled);
- templateCustomizer.ifPresent(tc -> tc.accept(template));
+ if (templateCustomizer.get() != null) {
+ templateCustomizer.get().accept(template);
+ }
}
return template;
@@ -360,11 +367,11 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
}
private Endpoint target() {
- if (endpoint.isPresent()) {
+ if (endpoint.get() != null) {
return endpoint.get();
}
- if (defaultEndpoint.isPresent()) {
- return defaultEndpoint.get();
+ if (defaultEndpoint != null) {
+ return defaultEndpoint;
}
throw new IllegalArgumentException("No endpoint configured on FluentProducerTemplate. You can configure an endpoint with to(uri)");
@@ -381,6 +388,11 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
@Override
protected void doStop() throws Exception {
clearAll();
+ this.endpoint.remove();
+ this.exchangeSupplier.remove();
+ this.processorSupplier.remove();
+ this.templateCustomizer.remove();
+
ServiceHelper.stopService(template);
}
}