You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/04/17 08:06:45 UTC

[camel] branch master updated: CAMEL-16525: camel-core - FluentProducerTemplate - Thread safety issue

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 864cee3  CAMEL-16525: camel-core - FluentProducerTemplate - Thread safety issue
864cee3 is described below

commit 864cee3219c667e0c8fc3d0de43f7b8bc055b5d4
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sat Apr 17 10:06:06 2021 +0200

    CAMEL-16525: camel-core - FluentProducerTemplate - Thread safety issue
---
 .../impl/engine/DefaultFluentProducerTemplate.java | 28 ++++++++--------------
 .../camel/builder/FluentProducerTemplateTest.java  | 25 +++++++++++++++++++
 2 files changed, 35 insertions(+), 18 deletions(-)

diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java
index 7d47e78..61da95e 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java
@@ -58,6 +58,7 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
     private volatile String endpointUri;
     private volatile ProducerTemplate template;
     private volatile boolean cloned;
+    private volatile boolean useDefaultEndpoint = true;
 
     public DefaultFluentProducerTemplate(CamelContext context) {
         this.context = context;
@@ -89,7 +90,7 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
     private DefaultFluentProducerTemplate newClone() {
         this.cloned = true;
         return new DefaultFluentProducerTemplate(
-                this, context, resultProcessors, defaultEndpoint, maximumCacheSize, eventNotifierEnabled, template, endpoint,
+                parent, context, resultProcessors, defaultEndpoint, maximumCacheSize, eventNotifierEnabled, template, endpoint,
                 endpointUri);
     }
 
@@ -310,28 +311,17 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
     public FluentProducerTemplate to(String endpointUri) {
         DefaultFluentProducerTemplate clone = checkCloned();
 
-        // optimize if we send to the same endpoint as before
-        if (clone.endpoint != null && clone.endpointUri != null && clone.endpointUri.equals(endpointUri)) {
-            return clone;
-        } else {
-            // store state of this endpoint so we can potentially reuse it again if sending to same endpoint next time
-            clone.endpointUri = endpointUri;
-            clone.endpoint = context.getEndpoint(endpointUri);
-            if (this.parent != null) {
-                this.parent.endpointUri = endpointUri;
-                this.parent.endpoint = clone.endpoint;
-            } else {
-                this.endpointUri = endpointUri;
-                this.endpoint = clone.endpoint;
-            }
-            return clone;
-        }
+        clone.useDefaultEndpoint = false;
+        clone.endpointUri = endpointUri;
+        clone.endpoint = context.getEndpoint(endpointUri);
+        return clone;
     }
 
     @Override
     public FluentProducerTemplate to(Endpoint endpoint) {
         DefaultFluentProducerTemplate clone = checkCloned();
 
+        clone.useDefaultEndpoint = false;
         clone.endpoint = endpoint;
         return clone;
     }
@@ -364,6 +354,7 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
 
         // reset cloned flag so when we use it again it has to set values again
         cloned = false;
+        useDefaultEndpoint = true;
 
         T result;
         if (type == Exchange.class) {
@@ -400,6 +391,7 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
 
         // reset cloned flag so when we use it again it has to set values again
         cloned = false;
+        useDefaultEndpoint = true;
 
         Future<T> result;
         if (ObjectHelper.isNotEmpty(clone.headers)) {
@@ -501,7 +493,7 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
     }
 
     private Endpoint target() {
-        if (endpoint != null) {
+        if (!useDefaultEndpoint && endpoint != null) {
             return endpoint;
         }
         if (defaultEndpoint != null) {
diff --git a/core/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java b/core/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java
index d5163ef..b14aa92 100644
--- a/core/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java
@@ -408,6 +408,27 @@ public class FluentProducerTemplateTest extends ContextTestSupport {
         assertMockEndpointsSatisfied();
     }
 
+    @Test
+    public void testUseFourTimesSameThread() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:echo");
+        mock.expectedBodiesReceived("Camel", "Beer");
+        mock.message(0).header("foo").isEqualTo("!");
+        mock.message(1).header("foo").isNull();
+
+        FluentProducerTemplate fluent = context.createFluentProducerTemplate();
+        fluent.setDefaultEndpointUri("direct:red");
+        Object result = fluent.withBody("Camel").withHeader("foo", "!").to("direct:echo").request();
+        Object result2 = fluent.withBody("World").to("direct:hi").request();
+        Object result3 = fluent.withBody("Beer").to("direct:echo").request();
+        Object result4 = fluent.withBody("Wine").request();
+        assertEquals("CamelCamel!", result);
+        assertEquals("Hi World", result2);
+        assertEquals("BeerBeer", result3);
+        assertEquals("Red Wine", result4);
+
+        assertMockEndpointsSatisfied();
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
@@ -436,6 +457,10 @@ public class FluentProducerTemplateTest extends ContextTestSupport {
 
                 from("direct:echo").to("mock:echo").setBody().simple("${body}${body}${header.foo}");
 
+                from("direct:hi").setBody().simple("Hi ${body}");
+
+                from("direct:red").setBody().simple("Red ${body}");
+
             }
         };
     }