You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/04/17 08:06:45 UTC
[camel] branch master updated: CAMEL-16525: camel-core -
FluentProducerTemplate - Thread safety issue
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 864cee3 CAMEL-16525: camel-core - FluentProducerTemplate - Thread safety issue
864cee3 is described below
commit 864cee3219c667e0c8fc3d0de43f7b8bc055b5d4
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sat Apr 17 10:06:06 2021 +0200
CAMEL-16525: camel-core - FluentProducerTemplate - Thread safety issue
---
.../impl/engine/DefaultFluentProducerTemplate.java | 28 ++++++++--------------
.../camel/builder/FluentProducerTemplateTest.java | 25 +++++++++++++++++++
2 files changed, 35 insertions(+), 18 deletions(-)
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java
index 7d47e78..61da95e 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java
@@ -58,6 +58,7 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
private volatile String endpointUri;
private volatile ProducerTemplate template;
private volatile boolean cloned;
+ private volatile boolean useDefaultEndpoint = true;
public DefaultFluentProducerTemplate(CamelContext context) {
this.context = context;
@@ -89,7 +90,7 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
private DefaultFluentProducerTemplate newClone() {
this.cloned = true;
return new DefaultFluentProducerTemplate(
- this, context, resultProcessors, defaultEndpoint, maximumCacheSize, eventNotifierEnabled, template, endpoint,
+ parent, context, resultProcessors, defaultEndpoint, maximumCacheSize, eventNotifierEnabled, template, endpoint,
endpointUri);
}
@@ -310,28 +311,17 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
public FluentProducerTemplate to(String endpointUri) {
DefaultFluentProducerTemplate clone = checkCloned();
- // optimize if we send to the same endpoint as before
- if (clone.endpoint != null && clone.endpointUri != null && clone.endpointUri.equals(endpointUri)) {
- return clone;
- } else {
- // store state of this endpoint so we can potentially reuse it again if sending to same endpoint next time
- clone.endpointUri = endpointUri;
- clone.endpoint = context.getEndpoint(endpointUri);
- if (this.parent != null) {
- this.parent.endpointUri = endpointUri;
- this.parent.endpoint = clone.endpoint;
- } else {
- this.endpointUri = endpointUri;
- this.endpoint = clone.endpoint;
- }
- return clone;
- }
+ clone.useDefaultEndpoint = false;
+ clone.endpointUri = endpointUri;
+ clone.endpoint = context.getEndpoint(endpointUri);
+ return clone;
}
@Override
public FluentProducerTemplate to(Endpoint endpoint) {
DefaultFluentProducerTemplate clone = checkCloned();
+ clone.useDefaultEndpoint = false;
clone.endpoint = endpoint;
return clone;
}
@@ -364,6 +354,7 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
// reset cloned flag so when we use it again it has to set values again
cloned = false;
+ useDefaultEndpoint = true;
T result;
if (type == Exchange.class) {
@@ -400,6 +391,7 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
// reset cloned flag so when we use it again it has to set values again
cloned = false;
+ useDefaultEndpoint = true;
Future<T> result;
if (ObjectHelper.isNotEmpty(clone.headers)) {
@@ -501,7 +493,7 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
}
private Endpoint target() {
- if (endpoint != null) {
+ if (!useDefaultEndpoint && endpoint != null) {
return endpoint;
}
if (defaultEndpoint != null) {
diff --git a/core/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java b/core/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java
index d5163ef..b14aa92 100644
--- a/core/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java
@@ -408,6 +408,27 @@ public class FluentProducerTemplateTest extends ContextTestSupport {
assertMockEndpointsSatisfied();
}
+ @Test
+ public void testUseFourTimesSameThread() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:echo");
+ mock.expectedBodiesReceived("Camel", "Beer");
+ mock.message(0).header("foo").isEqualTo("!");
+ mock.message(1).header("foo").isNull();
+
+ FluentProducerTemplate fluent = context.createFluentProducerTemplate();
+ fluent.setDefaultEndpointUri("direct:red");
+ Object result = fluent.withBody("Camel").withHeader("foo", "!").to("direct:echo").request();
+ Object result2 = fluent.withBody("World").to("direct:hi").request();
+ Object result3 = fluent.withBody("Beer").to("direct:echo").request();
+ Object result4 = fluent.withBody("Wine").request();
+ assertEquals("CamelCamel!", result);
+ assertEquals("Hi World", result2);
+ assertEquals("BeerBeer", result3);
+ assertEquals("Red Wine", result4);
+
+ assertMockEndpointsSatisfied();
+ }
+
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@@ -436,6 +457,10 @@ public class FluentProducerTemplateTest extends ContextTestSupport {
from("direct:echo").to("mock:echo").setBody().simple("${body}${body}${header.foo}");
+ from("direct:hi").setBody().simple("Hi ${body}");
+
+ from("direct:red").setBody().simple("Red ${body}");
+
}
};
}