You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/04/27 05:21:06 UTC
[camel] branch main updated: CAMEL-16525: camel-core -
FluentProducerTemplate - Thread safety issue. Thanks to Chris Nelson for
testing and reporting a problem.
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 9caa384 CAMEL-16525: camel-core - FluentProducerTemplate - Thread safety issue. Thanks to Chris Nelson for testing and reporting a problem.
9caa384 is described below
commit 9caa38481f773099ab2118998d54c7583e69619a
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Apr 27 07:20:20 2021 +0200
CAMEL-16525: camel-core - FluentProducerTemplate - Thread safety issue. Thanks to Chris Nelson for testing and reporting a problem.
---
.../org/apache/camel/FluentProducerTemplate.java | 33 +++++-
.../impl/engine/DefaultFluentProducerTemplate.java | 116 ++++++++++++++++-----
.../camel/builder/FluentProducerTemplateTest.java | 6 +-
3 files changed, 123 insertions(+), 32 deletions(-)
diff --git a/core/camel-api/src/main/java/org/apache/camel/FluentProducerTemplate.java b/core/camel-api/src/main/java/org/apache/camel/FluentProducerTemplate.java
index 068573b..f4de0ce 100644
--- a/core/camel-api/src/main/java/org/apache/camel/FluentProducerTemplate.java
+++ b/core/camel-api/src/main/java/org/apache/camel/FluentProducerTemplate.java
@@ -31,7 +31,36 @@ import org.apache.camel.util.ObjectHelper;
* {@link org.apache.camel.CamelExecutionException} while others stores any thrown exception on the returned
* {@link Exchange}. <br/>
* <p/>
- * The {@link FluentProducerTemplate} is <b>thread safe</b>. <br/>
+ * The {@link FluentProducerTemplate} is <b>thread safe</b> with the assumption that its the same (single) thread that
+ * builds the message (via the fluent methods) that also sends the message. <br/>
+ * When using the fluent template its required to chain the methods such as:
+ *
+ * <pre>
+ * FluentProducerTemplate fluent = ...
+ * fluent.withHeader("foo", 123).withHeader("bar", 456).withBody("Hello World").to("kafka:cheese").send();
+ * </pre>
+ *
+ * The following code is <b>wrong</b> (do not do this)
+ *
+ * <pre>
+ * FluentProducerTemplate fluent = ...
+ * fluent.withHeader("foo", 123);
+ * fluent.withHeader("bar", 456);
+ * fluent.withBody("Hello World");
+ * fluent.to("kafka:cheese");
+ * fluent.send();
+ * </pre>
+ *
+ * If you do not want to chain fluent methods you can do as follows:
+ *
+ * <pre>
+ * FluentProducerTemplate fluent = ...
+ * fluent = fluent.withHeader("foo", 123);
+ * fluent = fluent.withHeader("bar", 456);
+ * fluent = fluent.withBody("Hello World");
+ * fluent = fluent.to("kafka:cheese")
+ * fluent.send();
+ * </pre>
* <p/>
* All the methods which sends a message may throw {@link FailedToCreateProducerException} in case the {@link Producer}
* could not be created. Or a {@link NoSuchEndpointException} if the endpoint could not be resolved. There may be other
@@ -49,7 +78,7 @@ import org.apache.camel.util.ObjectHelper;
* <br/>
* <p/>
* Before using the template it must be started. And when you are done using the template, make sure to {@link #stop()}
- * the template. <br/>
+ * the template.<br/>
* <p/>
* <b>Important note on usage:</b> See this
* <a href="http://camel.apache.org/why-does-camel-use-too-many-threads-with-producertemplate.html">FAQ entry</a> before
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java
index fbb8f34..182433c 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java
@@ -39,6 +39,16 @@ import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
+/**
+ * This implementation is based on the usage pattern, that a top level DefaultFluentProducerTemplate instance is created
+ * as singleton and provided to the Camel end user (such as injected into a POJO).
+ * <p>
+ * The top level instance is then cloned once per message that is being built using the fluent method calls and then
+ * reset when the message has been sent.
+ * <p>
+ * Each cloned instance is not thread-safe as its assumed that its a single thread that calls the fluent method to build
+ * up the message to be sent.
+ */
public class DefaultFluentProducerTemplate extends ServiceSupport implements FluentProducerTemplate {
// transient state of endpoint, headers and body which needs to be thread local scoped to be thread-safe
@@ -53,7 +63,6 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
private Endpoint defaultEndpoint;
private int maximumCacheSize;
private boolean eventNotifierEnabled;
- private volatile DefaultFluentProducerTemplate parent;
private volatile Endpoint endpoint;
private volatile String endpointUri;
private volatile ProducerTemplate template;
@@ -71,11 +80,10 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
};
}
- private DefaultFluentProducerTemplate(DefaultFluentProducerTemplate parent, CamelContext context,
+ private DefaultFluentProducerTemplate(CamelContext context,
ClassValue<Processor> resultProcessors,
Endpoint defaultEndpoint, int maximumCacheSize, boolean eventNotifierEnabled,
ProducerTemplate template, Endpoint endpoint, String endpointUri) {
- this.parent = parent;
this.context = context;
this.resultProcessors = resultProcessors;
this.defaultEndpoint = defaultEndpoint;
@@ -88,9 +96,8 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
}
private DefaultFluentProducerTemplate newClone() {
- this.cloned = true;
return new DefaultFluentProducerTemplate(
- parent, context, resultProcessors, defaultEndpoint, maximumCacheSize, eventNotifierEnabled, template, endpoint,
+ context, resultProcessors, defaultEndpoint, maximumCacheSize, eventNotifierEnabled, template, endpoint,
endpointUri);
}
@@ -170,7 +177,6 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
public FluentProducerTemplate clearAll() {
clearBody();
clearHeaders();
-
return this;
}
@@ -352,10 +358,6 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
Processor processor = clone.processorSupplier != null ? clone.processorSupplier.get() : null;
final Processor processorSupplier = processor != null ? processor : clone.defaultProcessor();
- // reset cloned flag so when we use it again it has to set values again
- cloned = false;
- useDefaultEndpoint = true;
-
T result;
if (type == Exchange.class) {
result = (T) clone.template().request(target, processorSupplier);
@@ -374,6 +376,10 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
ExchangeHelper.extractResultBody(exchange, exchange.getPattern()));
}
+ // reset cloned flag so when we use it again it has to set values again
+ cloned = false;
+ useDefaultEndpoint = true;
+
return result;
}
@@ -389,10 +395,6 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
// Determine the target endpoint
final Endpoint target = clone.target();
- // reset cloned flag so when we use it again it has to set values again
- cloned = false;
- useDefaultEndpoint = true;
-
Future<T> result;
if (ObjectHelper.isNotEmpty(clone.headers)) {
// Make a copy of the headers and body so that async processing won't
@@ -409,6 +411,10 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
result = clone.template().asyncRequestBody(target, bodyCopy, type);
}
+ // reset cloned flag so when we use it again it has to set values again
+ cloned = false;
+ useDefaultEndpoint = true;
+
return result;
}
@@ -423,17 +429,21 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
// Determine the target endpoint
final Endpoint target = clone.target();
- // reset cloned flag so when we use it again it has to set values again
- cloned = false;
-
+ Exchange result;
Exchange exchange = clone.exchangeSupplier != null ? clone.exchangeSupplier.get() : null;
if (exchange != null) {
- return clone.template().send(target, exchange);
+ result = clone.template().send(target, exchange);
} else {
Processor proc = clone.processorSupplier != null ? clone.processorSupplier.get() : null;
final Processor processor = proc != null ? proc : clone.defaultProcessor();
- return clone.template().send(target, processor);
+ result = clone.template().send(target, processor);
}
+
+ // reset cloned flag so when we use it again it has to set values again
+ cloned = false;
+ useDefaultEndpoint = true;
+
+ return result;
}
@Override
@@ -443,17 +453,21 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
// Determine the target endpoint
final Endpoint target = clone.target();
- // reset cloned flag so when we use it again it has to set values again
- cloned = false;
-
+ Future<Exchange> result;
Exchange exchange = clone.exchangeSupplier != null ? clone.exchangeSupplier.get() : null;
if (exchange != null) {
- return clone.template().asyncSend(target, exchange);
+ result = clone.template().asyncSend(target, exchange);
} else {
Processor proc = clone.processorSupplier != null ? clone.processorSupplier.get() : null;
final Processor processor = proc != null ? proc : clone.defaultAsyncProcessor();
- return clone.template().asyncSend(target, processor);
+ result = clone.template().asyncSend(target, processor);
}
+
+ // reset cloned flag so when we use it again it has to set values again
+ cloned = false;
+ useDefaultEndpoint = true;
+
+ return result;
}
// ************************
@@ -463,10 +477,61 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
/**
* Create the FluentProducerTemplate by setting the camel context
*
- * @param context the camel context
+ * @param context the camel context
+ * @return a new created instance of the fluent producer template
*/
public static FluentProducerTemplate on(CamelContext context) {
DefaultFluentProducerTemplate fluent = new DefaultFluentProducerTemplate(context);
+ // we create a new private instance so mark it as cloned
+ fluent.cloned = true;
+ fluent.start();
+ return fluent;
+ }
+
+ /**
+ * Create the FluentProducerTemplate by setting the camel context and default endpoint
+ *
+ * @param context the camel context
+ * @param endpoint the default endpoint
+ * @return a new created instance of the fluent producer template
+ */
+ public static FluentProducerTemplate on(CamelContext context, Endpoint endpoint) {
+ DefaultFluentProducerTemplate fluent = new DefaultFluentProducerTemplate(context);
+ fluent.withDefaultEndpoint(endpoint);
+ // we create a new private instance so mark it as cloned
+ fluent.cloned = true;
+ fluent.start();
+ return fluent;
+ }
+
+ /**
+ * Create the FluentProducerTemplate by setting the camel context and default endpoint
+ *
+ * @param context the camel context
+ * @param resolver the default endpoint
+ * @return a new created instance of the fluent producer template
+ */
+ public static FluentProducerTemplate on(CamelContext context, EndpointProducerResolver resolver) {
+ DefaultFluentProducerTemplate fluent = new DefaultFluentProducerTemplate(context);
+ fluent.withDefaultEndpoint(resolver);
+ // we create a new private instance so mark it as cloned
+ fluent.cloned = true;
+ fluent.start();
+ return fluent;
+ }
+
+ /**
+ * Create the FluentProducerTemplate by setting the camel context and default endpoint
+ *
+ * @param context the camel context
+ * @param endpoint the default endpoint
+ * @return a new created instance of the fluent producer template
+ */
+ public static FluentProducerTemplate on(CamelContext context, String endpoint) {
+ DefaultFluentProducerTemplate fluent = new DefaultFluentProducerTemplate(context);
+ fluent.withDefaultEndpoint(endpoint);
+ // we create a new private instance so mark it as cloned
+ fluent.cloned = true;
fluent.start();
return fluent;
}
@@ -537,7 +602,6 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
@Override
protected void doStop() throws Exception {
clearAll();
- this.parent = null;
this.endpoint = null;
this.endpointUri = null;
this.exchangeSupplier = null;
diff --git a/core/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java b/core/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java
index 6229f8c..ee46ec2 100644
--- a/core/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java
@@ -92,9 +92,7 @@ public class FluentProducerTemplateTest extends ContextTestSupport {
mock.expectedBodiesReceived("Bye World");
FluentProducerTemplate on = DefaultFluentProducerTemplate.on(context);
- on.withBody("Hello World");
- on.toF("direct:%s", "in");
- Object result = on.request();
+ Object result = on.withBody("Hello World").toF("direct:%s", "in").request();
assertMockEndpointsSatisfied();
@@ -108,7 +106,7 @@ public class FluentProducerTemplateTest extends ContextTestSupport {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedBodiesReceived("Bye World");
- FluentProducerTemplate template = DefaultFluentProducerTemplate.on(context).withDefaultEndpoint("direct:in");
+ FluentProducerTemplate template = DefaultFluentProducerTemplate.on(context, "direct:in");
Object result = template.withBody("Hello World").request();