You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/02/16 10:24:00 UTC
[camel] branch master updated: CAMEL-16217: camel-core -
FluentProducerTemplate - optimize to uri when its the same endpoint
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new e0194d5 CAMEL-16217: camel-core - FluentProducerTemplate - optimize to uri when its the same endpoint
e0194d5 is described below
commit e0194d5d4bf9b7d4d3b37dae73d40b41a4dd7650
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Feb 16 11:23:04 2021 +0100
CAMEL-16217: camel-core - FluentProducerTemplate - optimize to uri when its the same endpoint
---
.../org/apache/camel/FluentProducerTemplate.java | 23 ++++++-
.../impl/engine/DefaultFluentProducerTemplate.java | 72 ++++++++++++++++++----
.../camel/builder/FluentProducerTemplateTest.java | 34 ++++++++++
.../camel/impl/DefaultProducerTemplateTest.java | 16 +++++
4 files changed, 132 insertions(+), 13 deletions(-)
diff --git a/core/camel-api/src/main/java/org/apache/camel/FluentProducerTemplate.java b/core/camel-api/src/main/java/org/apache/camel/FluentProducerTemplate.java
index be53feb..243dd2f 100644
--- a/core/camel-api/src/main/java/org/apache/camel/FluentProducerTemplate.java
+++ b/core/camel-api/src/main/java/org/apache/camel/FluentProducerTemplate.java
@@ -254,6 +254,27 @@ public interface FluentProducerTemplate extends Service {
FluentProducerTemplate withProcessor(Supplier<Processor> processorSupplier);
/**
+ * Sets the default endpoint
+ *
+ * @param endpointUri the endpoint URI to send to
+ */
+ FluentProducerTemplate withDefaultEndpoint(String endpointUri);
+
+ /**
+ * Sets the default endpoint
+ *
+ * @param resolver the {@link EndpointProducerResolver} that supply the endpoint to send to.
+ */
+ FluentProducerTemplate withDefaultEndpoint(EndpointProducerResolver resolver);
+
+ /**
+ * Sets the default endpoint
+ *
+ * @param endpoint the endpoint to send to
+ */
+ FluentProducerTemplate withDefaultEndpoint(Endpoint endpoint);
+
+ /**
* Endpoint to send to
*
* @param endpointUri the endpoint URI to send to
@@ -277,7 +298,7 @@ public interface FluentProducerTemplate extends Service {
/**
* Endpoint to send to
*
- * @param resolver the {@link EndpointConsumerResolver} that supply the endpoint to send to.
+ * @param resolver the {@link EndpointProducerResolver} that supply the endpoint to send to.
*/
default FluentProducerTemplate to(EndpointProducerResolver resolver) {
final CamelContext context = ObjectHelper.notNull(getCamelContext(), "camel context");
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java
index 4fb43f7..68aa64d 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java
@@ -26,14 +26,13 @@ import java.util.function.Supplier;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelExecutionException;
import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointProducerResolver;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
-import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.FluentProducerTemplate;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
-import org.apache.camel.spi.ProcessorFactory;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.processor.ConvertBodyProcessor;
import org.apache.camel.support.service.ServiceHelper;
@@ -45,23 +44,23 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
// transient state of endpoint, headers and body which needs to be thread local scoped to be thread-safe
private Map<String, Object> headers;
private Object body;
- private Endpoint endpoint;
private Supplier<Exchange> exchangeSupplier;
private Supplier<Processor> processorSupplier;
private Consumer<ProducerTemplate> templateCustomizer;
private final CamelContext context;
- private final ProcessorFactory processorFactory;
private final ClassValue<Processor> resultProcessors;
private Endpoint defaultEndpoint;
private int maximumCacheSize;
private boolean eventNotifierEnabled;
+ private volatile DefaultFluentProducerTemplate parent;
+ private volatile Endpoint endpoint;
+ private volatile String endpointUri;
private volatile ProducerTemplate template;
private volatile boolean cloned;
public DefaultFluentProducerTemplate(CamelContext context) {
this.context = context;
- this.processorFactory = context.adapt(ExtendedCamelContext.class).getProcessorFactory();
this.eventNotifierEnabled = true;
this.resultProcessors = new ClassValue<Processor>() {
@Override
@@ -71,22 +70,27 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
};
}
- private DefaultFluentProducerTemplate(CamelContext context, ClassValue<Processor> resultProcessors,
+ private DefaultFluentProducerTemplate(DefaultFluentProducerTemplate parent, CamelContext context,
+ ClassValue<Processor> resultProcessors,
Endpoint defaultEndpoint, int maximumCacheSize, boolean eventNotifierEnabled,
- ProducerTemplate template) {
+ ProducerTemplate template, Endpoint endpoint, String endpointUri) {
+ this.parent = parent;
this.context = context;
- this.processorFactory = context.adapt(ExtendedCamelContext.class).getProcessorFactory();
this.resultProcessors = resultProcessors;
this.defaultEndpoint = defaultEndpoint;
this.maximumCacheSize = maximumCacheSize;
this.eventNotifierEnabled = eventNotifierEnabled;
this.template = template;
+ this.endpoint = endpoint;
+ this.endpointUri = endpointUri;
this.cloned = true;
}
private DefaultFluentProducerTemplate newClone() {
+ this.cloned = true;
return new DefaultFluentProducerTemplate(
- context, resultProcessors, defaultEndpoint, maximumCacheSize, eventNotifierEnabled, template);
+ this, context, resultProcessors, defaultEndpoint, maximumCacheSize, eventNotifierEnabled, template, endpoint,
+ endpointUri);
}
private DefaultFluentProducerTemplate checkCloned() {
@@ -220,6 +224,33 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
}
@Override
+ public FluentProducerTemplate withDefaultEndpoint(String endpointUri) {
+ if (cloned) {
+ throw new IllegalArgumentException("Default endpoint must be set before template has been used");
+ }
+ this.defaultEndpoint = getCamelContext().getEndpoint(endpointUri);
+ return this;
+ }
+
+ @Override
+ public FluentProducerTemplate withDefaultEndpoint(EndpointProducerResolver resolver) {
+ if (cloned) {
+ throw new IllegalArgumentException("Default endpoint must be set before template has been used");
+ }
+ this.defaultEndpoint = resolver.resolve(getCamelContext());
+ return this;
+ }
+
+ @Override
+ public FluentProducerTemplate withDefaultEndpoint(Endpoint endpoint) {
+ if (cloned) {
+ throw new IllegalArgumentException("Default endpoint must be set before template has been used");
+ }
+ this.defaultEndpoint = endpoint;
+ return this;
+ }
+
+ @Override
public FluentProducerTemplate withTemplateCustomizer(final Consumer<ProducerTemplate> templateCustomizer) {
if (this.templateCustomizer != null && isStarted()) {
throw new IllegalArgumentException("Not allowed after template has been started");
@@ -264,7 +295,24 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
@Override
public FluentProducerTemplate to(String endpointUri) {
- return to(context.getEndpoint(endpointUri));
+ DefaultFluentProducerTemplate clone = checkCloned();
+
+ // optimize if we send to the same endpoint as before
+ if (clone.endpoint != null && clone.endpointUri != null && clone.endpointUri.equals(endpointUri)) {
+ return clone;
+ } else {
+ // store state of this endpoint so we can potentially reuse it again if sending to same endpoint next time
+ clone.endpointUri = endpointUri;
+ clone.endpoint = context.getEndpoint(endpointUri);
+ if (this.parent != null) {
+ this.parent.endpointUri = endpointUri;
+ this.parent.endpoint = clone.endpoint;
+ } else {
+ this.endpointUri = endpointUri;
+ this.endpoint = clone.endpoint;
+ }
+ return clone;
+ }
}
@Override
@@ -415,8 +463,6 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
public static FluentProducerTemplate on(CamelContext context) {
DefaultFluentProducerTemplate fluent = new DefaultFluentProducerTemplate(context);
fluent.start();
- // mark it as cloned as its started
- fluent.cloned = true;
return fluent;
}
@@ -479,7 +525,9 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
@Override
protected void doStop() throws Exception {
clearAll();
+ this.parent = null;
this.endpoint = null;
+ this.endpointUri = null;
this.exchangeSupplier = null;
this.processorSupplier = null;
this.templateCustomizer = null;
diff --git a/core/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java b/core/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java
index b9b5e8d..d5163ef 100644
--- a/core/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java
@@ -104,6 +104,22 @@ public class FluentProducerTemplateTest extends ContextTestSupport {
}
@Test
+ public void testWithDefaultEndpoint() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("Bye World");
+
+ FluentProducerTemplate template = DefaultFluentProducerTemplate.on(context).withDefaultEndpoint("direct:in");
+
+ Object result = template.withBody("Hello World").request();
+
+ assertMockEndpointsSatisfied();
+
+ assertEquals("Bye World", result);
+
+ assertSame(context, template.getCamelContext());
+ }
+
+ @Test
public void testIn() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedBodiesReceived("Bye World");
@@ -118,6 +134,24 @@ public class FluentProducerTemplateTest extends ContextTestSupport {
}
@Test
+ public void testInTwice() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("Bye World", "Bye World");
+
+ FluentProducerTemplate template = DefaultFluentProducerTemplate.on(context);
+
+ Object result = template.withBody("Hello World").to("direct:in").request();
+ Object result2 = template.withBody("Hello World Again").to("direct:in").request();
+
+ assertMockEndpointsSatisfied();
+
+ assertEquals("Bye World", result);
+ assertEquals("Bye World", result2);
+
+ assertSame(context, template.getCamelContext());
+ }
+
+ @Test
public void testInOut() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedBodiesReceived("Bye Bye World");
diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateTest.java
index 1fb0665..a25b2d8 100644
--- a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateTest.java
@@ -55,6 +55,22 @@ public class DefaultProducerTemplateTest extends ContextTestSupport {
}
@Test
+ public void testInTwice() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("Bye World", "Bye World");
+
+ Object result = template.requestBody("direct:in", "Hello World");
+ Object result2 = template.requestBody("direct:in", "Hello World Again");
+
+ assertMockEndpointsSatisfied();
+
+ assertEquals("Bye World", result);
+ assertEquals("Bye World", result2);
+
+ assertSame(context, template.getCamelContext());
+ }
+
+ @Test
public void testInOut() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedBodiesReceived("Bye Bye World");