You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/04/27 05:21:06 UTC

[camel] branch main updated: CAMEL-16525: camel-core - FluentProducerTemplate - Thread safety issue. Thanks to Chris Nelson for testing and reporting a problem.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 9caa384  CAMEL-16525: camel-core - FluentProducerTemplate - Thread safety issue. Thanks to Chris Nelson for testing and reporting a problem.
9caa384 is described below

commit 9caa38481f773099ab2118998d54c7583e69619a
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Apr 27 07:20:20 2021 +0200

    CAMEL-16525: camel-core - FluentProducerTemplate - Thread safety issue. Thanks to Chris Nelson for testing and reporting a problem.
---
 .../org/apache/camel/FluentProducerTemplate.java   |  33 +++++-
 .../impl/engine/DefaultFluentProducerTemplate.java | 116 ++++++++++++++++-----
 .../camel/builder/FluentProducerTemplateTest.java  |   6 +-
 3 files changed, 123 insertions(+), 32 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/FluentProducerTemplate.java b/core/camel-api/src/main/java/org/apache/camel/FluentProducerTemplate.java
index 068573b..f4de0ce 100644
--- a/core/camel-api/src/main/java/org/apache/camel/FluentProducerTemplate.java
+++ b/core/camel-api/src/main/java/org/apache/camel/FluentProducerTemplate.java
@@ -31,7 +31,36 @@ import org.apache.camel.util.ObjectHelper;
  * {@link org.apache.camel.CamelExecutionException} while others stores any thrown exception on the returned
  * {@link Exchange}. <br/>
  * <p/>
- * The {@link FluentProducerTemplate} is <b>thread safe</b>. <br/>
+ * The {@link FluentProducerTemplate} is <b>thread safe</b> with the assumption that its the same (single) thread that
+ * builds the message (via the fluent methods) that also sends the message. <br/>
+ * When using the fluent template its required to chain the methods such as:
+ * 
+ * <pre>
+ *     FluentProducerTemplate fluent = ...
+ *     fluent.withHeader("foo", 123).withHeader("bar", 456).withBody("Hello World").to("kafka:cheese").send();
+ * </pre>
+ * 
+ * The following code is <b>wrong</b> (do not do this)
+ * 
+ * <pre>
+ *     FluentProducerTemplate fluent = ...
+ *     fluent.withHeader("foo", 123);
+ *     fluent.withHeader("bar", 456);
+ *     fluent.withBody("Hello World");
+ *     fluent.to("kafka:cheese");
+ *     fluent.send();
+ * </pre>
+ * 
+ * If you do not want to chain fluent methods you can do as follows:
+ * 
+ * <pre>
+ *     FluentProducerTemplate fluent = ...
+ *     fluent = fluent.withHeader("foo", 123);
+ *     fluent = fluent.withHeader("bar", 456);
+ *     fluent = fluent.withBody("Hello World");
+ *     fluent = fluent.to("kafka:cheese")
+ *     fluent.send();
+ * </pre>
  * <p/>
  * All the methods which sends a message may throw {@link FailedToCreateProducerException} in case the {@link Producer}
  * could not be created. Or a {@link NoSuchEndpointException} if the endpoint could not be resolved. There may be other
@@ -49,7 +78,7 @@ import org.apache.camel.util.ObjectHelper;
  * <br/>
  * <p/>
  * Before using the template it must be started. And when you are done using the template, make sure to {@link #stop()}
- * the template. <br/>
+ * the template.<br/>
  * <p/>
  * <b>Important note on usage:</b> See this
  * <a href="http://camel.apache.org/why-does-camel-use-too-many-threads-with-producertemplate.html">FAQ entry</a> before
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java
index fbb8f34..182433c 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java
@@ -39,6 +39,16 @@ import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.support.service.ServiceSupport;
 import org.apache.camel.util.ObjectHelper;
 
+/**
+ * This implementation is based on the usage pattern, that a top level DefaultFluentProducerTemplate instance is created
+ * as singleton and provided to the Camel end user (such as injected into a POJO).
+ * <p>
+ * The top level instance is then cloned once per message that is being built using the fluent method calls and then
+ * reset when the message has been sent.
+ * <p>
+ * Each cloned instance is not thread-safe as its assumed that its a single thread that calls the fluent method to build
+ * up the message to be sent.
+ */
 public class DefaultFluentProducerTemplate extends ServiceSupport implements FluentProducerTemplate {
 
     // transient state of endpoint, headers and body which needs to be thread local scoped to be thread-safe
@@ -53,7 +63,6 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
     private Endpoint defaultEndpoint;
     private int maximumCacheSize;
     private boolean eventNotifierEnabled;
-    private volatile DefaultFluentProducerTemplate parent;
     private volatile Endpoint endpoint;
     private volatile String endpointUri;
     private volatile ProducerTemplate template;
@@ -71,11 +80,10 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
         };
     }
 
-    private DefaultFluentProducerTemplate(DefaultFluentProducerTemplate parent, CamelContext context,
+    private DefaultFluentProducerTemplate(CamelContext context,
                                           ClassValue<Processor> resultProcessors,
                                           Endpoint defaultEndpoint, int maximumCacheSize, boolean eventNotifierEnabled,
                                           ProducerTemplate template, Endpoint endpoint, String endpointUri) {
-        this.parent = parent;
         this.context = context;
         this.resultProcessors = resultProcessors;
         this.defaultEndpoint = defaultEndpoint;
@@ -88,9 +96,8 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
     }
 
     private DefaultFluentProducerTemplate newClone() {
-        this.cloned = true;
         return new DefaultFluentProducerTemplate(
-                parent, context, resultProcessors, defaultEndpoint, maximumCacheSize, eventNotifierEnabled, template, endpoint,
+                context, resultProcessors, defaultEndpoint, maximumCacheSize, eventNotifierEnabled, template, endpoint,
                 endpointUri);
     }
 
@@ -170,7 +177,6 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
     public FluentProducerTemplate clearAll() {
         clearBody();
         clearHeaders();
-
         return this;
     }
 
@@ -352,10 +358,6 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
         Processor processor = clone.processorSupplier != null ? clone.processorSupplier.get() : null;
         final Processor processorSupplier = processor != null ? processor : clone.defaultProcessor();
 
-        // reset cloned flag so when we use it again it has to set values again
-        cloned = false;
-        useDefaultEndpoint = true;
-
         T result;
         if (type == Exchange.class) {
             result = (T) clone.template().request(target, processorSupplier);
@@ -374,6 +376,10 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
                     ExchangeHelper.extractResultBody(exchange, exchange.getPattern()));
         }
 
+        // reset cloned flag so when we use it again it has to set values again
+        cloned = false;
+        useDefaultEndpoint = true;
+
         return result;
     }
 
@@ -389,10 +395,6 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
         // Determine the target endpoint
         final Endpoint target = clone.target();
 
-        // reset cloned flag so when we use it again it has to set values again
-        cloned = false;
-        useDefaultEndpoint = true;
-
         Future<T> result;
         if (ObjectHelper.isNotEmpty(clone.headers)) {
             // Make a copy of the headers and body so that async processing won't
@@ -409,6 +411,10 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
             result = clone.template().asyncRequestBody(target, bodyCopy, type);
         }
 
+        // reset cloned flag so when we use it again it has to set values again
+        cloned = false;
+        useDefaultEndpoint = true;
+
         return result;
     }
 
@@ -423,17 +429,21 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
         // Determine the target endpoint
         final Endpoint target = clone.target();
 
-        // reset cloned flag so when we use it again it has to set values again
-        cloned = false;
-
+        Exchange result;
         Exchange exchange = clone.exchangeSupplier != null ? clone.exchangeSupplier.get() : null;
         if (exchange != null) {
-            return clone.template().send(target, exchange);
+            result = clone.template().send(target, exchange);
         } else {
             Processor proc = clone.processorSupplier != null ? clone.processorSupplier.get() : null;
             final Processor processor = proc != null ? proc : clone.defaultProcessor();
-            return clone.template().send(target, processor);
+            result = clone.template().send(target, processor);
         }
+
+        // reset cloned flag so when we use it again it has to set values again
+        cloned = false;
+        useDefaultEndpoint = true;
+
+        return result;
     }
 
     @Override
@@ -443,17 +453,21 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
         // Determine the target endpoint
         final Endpoint target = clone.target();
 
-        // reset cloned flag so when we use it again it has to set values again
-        cloned = false;
-
+        Future<Exchange> result;
         Exchange exchange = clone.exchangeSupplier != null ? clone.exchangeSupplier.get() : null;
         if (exchange != null) {
-            return clone.template().asyncSend(target, exchange);
+            result = clone.template().asyncSend(target, exchange);
         } else {
             Processor proc = clone.processorSupplier != null ? clone.processorSupplier.get() : null;
             final Processor processor = proc != null ? proc : clone.defaultAsyncProcessor();
-            return clone.template().asyncSend(target, processor);
+            result = clone.template().asyncSend(target, processor);
         }
+
+        // reset cloned flag so when we use it again it has to set values again
+        cloned = false;
+        useDefaultEndpoint = true;
+
+        return result;
     }
 
     // ************************
@@ -463,10 +477,61 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
     /**
      * Create the FluentProducerTemplate by setting the camel context
      *
-     * @param context the camel context
+     * @param  context the camel context
+     * @return         a new created instance of the fluent producer template
      */
     public static FluentProducerTemplate on(CamelContext context) {
         DefaultFluentProducerTemplate fluent = new DefaultFluentProducerTemplate(context);
+        // we create a new private instance so mark it as cloned
+        fluent.cloned = true;
+        fluent.start();
+        return fluent;
+    }
+
+    /**
+     * Create the FluentProducerTemplate by setting the camel context and default endpoint
+     *
+     * @param  context  the camel context
+     * @param  endpoint the default endpoint
+     * @return          a new created instance of the fluent producer template
+     */
+    public static FluentProducerTemplate on(CamelContext context, Endpoint endpoint) {
+        DefaultFluentProducerTemplate fluent = new DefaultFluentProducerTemplate(context);
+        fluent.withDefaultEndpoint(endpoint);
+        // we create a new private instance so mark it as cloned
+        fluent.cloned = true;
+        fluent.start();
+        return fluent;
+    }
+
+    /**
+     * Create the FluentProducerTemplate by setting the camel context and default endpoint
+     *
+     * @param  context  the camel context
+     * @param  resolver the default endpoint
+     * @return          a new created instance of the fluent producer template
+     */
+    public static FluentProducerTemplate on(CamelContext context, EndpointProducerResolver resolver) {
+        DefaultFluentProducerTemplate fluent = new DefaultFluentProducerTemplate(context);
+        fluent.withDefaultEndpoint(resolver);
+        // we create a new private instance so mark it as cloned
+        fluent.cloned = true;
+        fluent.start();
+        return fluent;
+    }
+
+    /**
+     * Create the FluentProducerTemplate by setting the camel context and default endpoint
+     *
+     * @param  context  the camel context
+     * @param  endpoint the default endpoint
+     * @return          a new created instance of the fluent producer template
+     */
+    public static FluentProducerTemplate on(CamelContext context, String endpoint) {
+        DefaultFluentProducerTemplate fluent = new DefaultFluentProducerTemplate(context);
+        fluent.withDefaultEndpoint(endpoint);
+        // we create a new private instance so mark it as cloned
+        fluent.cloned = true;
         fluent.start();
         return fluent;
     }
@@ -537,7 +602,6 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
     @Override
     protected void doStop() throws Exception {
         clearAll();
-        this.parent = null;
         this.endpoint = null;
         this.endpointUri = null;
         this.exchangeSupplier = null;
diff --git a/core/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java b/core/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java
index 6229f8c..ee46ec2 100644
--- a/core/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java
@@ -92,9 +92,7 @@ public class FluentProducerTemplateTest extends ContextTestSupport {
         mock.expectedBodiesReceived("Bye World");
 
         FluentProducerTemplate on = DefaultFluentProducerTemplate.on(context);
-        on.withBody("Hello World");
-        on.toF("direct:%s", "in");
-        Object result = on.request();
+        Object result = on.withBody("Hello World").toF("direct:%s", "in").request();
 
         assertMockEndpointsSatisfied();
 
@@ -108,7 +106,7 @@ public class FluentProducerTemplateTest extends ContextTestSupport {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedBodiesReceived("Bye World");
 
-        FluentProducerTemplate template = DefaultFluentProducerTemplate.on(context).withDefaultEndpoint("direct:in");
+        FluentProducerTemplate template = DefaultFluentProducerTemplate.on(context, "direct:in");
 
         Object result = template.withBody("Hello World").request();