You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2017/03/07 13:16:37 UTC
camel git commit: CAMEL-10820: DefaultFluentProducerTemplate mixes up
data when sending asynchronously
Repository: camel
Updated Branches:
refs/heads/master 4c8eab4e7 -> 05cbb33c0
CAMEL-10820: DefaultFluentProducerTemplate mixes up data when sending asynchronously
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/05cbb33c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/05cbb33c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/05cbb33c
Branch: refs/heads/master
Commit: 05cbb33c0da05aeb4c8c70137a41d53a58c38388
Parents: 4c8eab4
Author: lburgazzoli <lb...@gmail.com>
Authored: Mon Mar 6 11:26:19 2017 +0100
Committer: lburgazzoli <lb...@gmail.com>
Committed: Tue Mar 7 14:15:57 2017 +0100
----------------------------------------------------------------------
.../builder/DefaultFluentProducerTemplate.java | 140 ++++++++++---------
.../org/apache/camel/util/ObjectHelper.java | 7 +-
.../builder/FluentProducerTemplateTest.java | 66 +++++++++
3 files changed, 147 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/05cbb33c/camel-core/src/main/java/org/apache/camel/builder/DefaultFluentProducerTemplate.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/builder/DefaultFluentProducerTemplate.java b/camel-core/src/main/java/org/apache/camel/builder/DefaultFluentProducerTemplate.java
index bac078e..965afef 100644
--- a/camel-core/src/main/java/org/apache/camel/builder/DefaultFluentProducerTemplate.java
+++ b/camel-core/src/main/java/org/apache/camel/builder/DefaultFluentProducerTemplate.java
@@ -18,6 +18,7 @@ package org.apache.camel.builder;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.function.Supplier;
@@ -42,24 +43,23 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
private final ClassValue<ConvertBodyProcessor> resultProcessors;
private Map<String, Object> headers;
private Object body;
- private Endpoint endpoint;
- private Consumer<ProducerTemplate> templateCustomizer;
- private Supplier<Exchange> exchangeSupplier;
- private Supplier<Processor> processorSupplier;
- private volatile ProducerTemplate template;
- private Endpoint defaultEndpoint;
+ private Optional<Consumer<ProducerTemplate>> templateCustomizer;
+ private Optional<Supplier<Exchange>> exchangeSupplier;
+ private Optional<Supplier<Processor>> processorSupplier;
+ private Optional<Endpoint> endpoint;
+ private Optional<Endpoint> defaultEndpoint;
private int maximumCacheSize;
- private boolean eventNotifierEnabled = true;
+ private boolean eventNotifierEnabled;
+ private volatile ProducerTemplate template;
public DefaultFluentProducerTemplate(CamelContext context) {
this.context = context;
- this.headers = null;
- this.body = null;
- this.endpoint = null;
- this.templateCustomizer = null;
- this.exchangeSupplier = null;
- this.processorSupplier = () -> this::populateExchange;
- this.template = null;
+ this.endpoint = Optional.empty();
+ this.defaultEndpoint = Optional.empty();
+ this.eventNotifierEnabled = true;
+ this.templateCustomizer = Optional.empty();
+ this.exchangeSupplier = Optional.empty();
+ this.processorSupplier = Optional.empty();
this.resultProcessors = new ClassValue<ConvertBodyProcessor>() {
@Override
protected ConvertBodyProcessor computeValue(Class<?> type) {
@@ -95,12 +95,12 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
@Override
public Endpoint getDefaultEndpoint() {
- return defaultEndpoint;
+ return defaultEndpoint.orElse(null);
}
@Override
public void setDefaultEndpoint(Endpoint defaultEndpoint) {
- this.defaultEndpoint = defaultEndpoint;
+ this.defaultEndpoint = Optional.of(defaultEndpoint);
}
@Override
@@ -168,7 +168,7 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
@Override
public FluentProducerTemplate withTemplateCustomizer(final Consumer<ProducerTemplate> templateCustomizer) {
- this.templateCustomizer = templateCustomizer;
+ this.templateCustomizer = Optional.of(templateCustomizer);
return this;
}
@@ -179,7 +179,7 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
@Override
public FluentProducerTemplate withExchange(final Supplier<Exchange> exchangeSupplier) {
- this.exchangeSupplier = exchangeSupplier;
+ this.exchangeSupplier = Optional.of(exchangeSupplier);
return this;
}
@@ -190,7 +190,7 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
@Override
public FluentProducerTemplate withProcessor(final Supplier<Processor> processorSupplier) {
- this.processorSupplier = processorSupplier;
+ this.processorSupplier = Optional.of(processorSupplier);
return this;
}
@@ -201,7 +201,7 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
@Override
public FluentProducerTemplate to(Endpoint endpoint) {
- this.endpoint = endpoint;
+ this.endpoint = Optional.of(endpoint);
return this;
}
@@ -217,13 +217,13 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
@Override
@SuppressWarnings("unchecked")
public <T> T request(Class<T> type) throws CamelExecutionException {
- T result;
- Endpoint target = endpoint != null ? endpoint : defaultEndpoint;
- // we must have an endpoint to send to
- if (target == null) {
- throw new IllegalArgumentException("No endpoint configured on FluentProducerTemplate. You can configure an endpoint with to(uri)");
- }
+ // Determine the target endpoint
+ final Endpoint target = target();
+
+ // Create the default processor if not provided.
+ final Supplier<Processor> processorSupplier = this.processorSupplier.orElse(() -> defaultProcessor());
+ T result;
if (type == Exchange.class) {
result = (T)template().request(target, processorSupplier.get());
} else if (type == Message.class) {
@@ -253,18 +253,23 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
@Override
public <T> Future<T> asyncRequest(Class<T> type) {
- Endpoint target = endpoint != null ? endpoint : defaultEndpoint;
-
- // we must have an endpoint to send to
- if (target == null) {
- throw new IllegalArgumentException("No endpoint configured on FluentProducerTemplate. You can configure an endpoint with to(uri)");
- }
+ // Determine the target endpoint
+ final Endpoint target = target();
Future<T> result;
- if (headers != null) {
- result = template().asyncRequestBodyAndHeaders(target, body, headers, type);
+ if (ObjectHelper.isNotEmpty(headers)) {
+ // Make a copy of the headers and body so that async processing won't
+ // be invalidated by subsequent reuse of the template
+ final Map<String, Object> headersCopy = new HashMap<>(headers);
+ final Object bodyCopy = body;
+
+ result = template().asyncRequestBodyAndHeaders(target, bodyCopy, headersCopy, type);
} else {
- result = template().asyncRequestBody(target, body, type);
+ // Make a copy of the and body so that async processing won't be
+ // invalidated by subsequent reuse of the template
+ final Object bodyCopy = body;
+
+ result = template().asyncRequestBody(target, bodyCopy, type);
}
return result;
@@ -276,30 +281,22 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
@Override
public Exchange send() throws CamelExecutionException {
- Endpoint target = endpoint != null ? endpoint : defaultEndpoint;
-
- // we must have an endpoint to send to
- if (target == null) {
- throw new IllegalArgumentException("No endpoint configured on FluentProducerTemplate. You can configure an endpoint with to(uri)");
- }
+ // Determine the target endpoint
+ final Endpoint target = target();
- return exchangeSupplier != null
- ? template().send(target, exchangeSupplier.get())
- : template().send(target, processorSupplier.get());
+ return exchangeSupplier.isPresent()
+ ? template().send(target, exchangeSupplier.get().get())
+ : template().send(target, processorSupplier.orElse(() -> defaultProcessor()).get());
}
@Override
public Future<Exchange> asyncSend() {
- Endpoint target = endpoint != null ? endpoint : defaultEndpoint;
-
- // we must have an endpoint to send to
- if (target == null) {
- throw new IllegalArgumentException("No endpoint configured on FluentProducerTemplate. You can configure an endpoint with to(uri)");
- }
+ // Determine the target endpoint
+ final Endpoint target = target();
- return exchangeSupplier != null
- ? template().asyncSend(target, exchangeSupplier.get())
- : template().asyncSend(target, processorSupplier.get());
+ return exchangeSupplier.isPresent()
+ ? template().asyncSend(target, exchangeSupplier.get().get())
+ : template().asyncSend(target, processorSupplier.orElse(() -> defaultAsyncProcessor()).get());
}
// ************************
@@ -320,25 +317,40 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
if (template == null) {
template = maximumCacheSize > 0 ? context.createProducerTemplate(maximumCacheSize) : context.createProducerTemplate();
- if (defaultEndpoint != null) {
- template.setDefaultEndpoint(defaultEndpoint);
- }
+ defaultEndpoint.ifPresent(template::setDefaultEndpoint);
template.setEventNotifierEnabled(eventNotifierEnabled);
- if (templateCustomizer != null) {
- templateCustomizer.accept(template);
- }
+ templateCustomizer.ifPresent(tc -> tc.accept(template));
}
return template;
}
- private void populateExchange(Exchange exchange) throws Exception {
- if (headers != null && !headers.isEmpty()) {
- exchange.getIn().getHeaders().putAll(headers);
+ private Processor defaultProcessor() {
+ return exchange -> {
+ ObjectHelper.ifNotEmpty(headers, exchange.getIn().getHeaders()::putAll);
+ ObjectHelper.ifNotEmpty(body, exchange.getIn()::setBody);
+ };
+ }
+
+ private Processor defaultAsyncProcessor() {
+ final Map<String, Object> headersCopy = ObjectHelper.isNotEmpty(this.headers) ? new HashMap<>(this.headers) : null;
+ final Object bodyCopy = this.body;
+
+ return exchange -> {
+ ObjectHelper.ifNotEmpty(headersCopy, exchange.getIn().getHeaders()::putAll);
+ ObjectHelper.ifNotEmpty(bodyCopy, exchange.getIn()::setBody);
+ };
+ }
+
+ private Endpoint target() {
+ if (endpoint.isPresent()) {
+ return endpoint.get();
}
- if (body != null) {
- exchange.getIn().setBody(body);
+ if (defaultEndpoint.isPresent()) {
+ return defaultEndpoint.get();
}
+
+ throw new IllegalArgumentException("No endpoint configured on FluentProducerTemplate. You can configure an endpoint with to(uri)");
}
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/05cbb33c/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java b/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java
index b87cbc6..91e3862 100644
--- a/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java
+++ b/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java
@@ -375,24 +375,27 @@ public final class ObjectHelper {
}
/**
- * Tests whether the value is <b>not</b> <tt>null</tt> or an empty string.
+ * Tests whether the value is <b>not</b> <tt>null</tt>, an empty string or an empty collection.
*
* @param value the value, if its a String it will be tested for text length as well
* @return true if <b>not</b> empty
*/
+ @SuppressWarnings("unchecked")
public static boolean isNotEmpty(Object value) {
if (value == null) {
return false;
} else if (value instanceof String) {
String text = (String) value;
return text.trim().length() > 0;
+ } else if (value instanceof Collection) {
+ return !((Collection<?>)value).isEmpty();
} else {
return true;
}
}
/**
- * Tests whether the value is <b>not</b> <tt>null</tt> or an empty string.
+ * Tests whether the value is <b>not</b> <tt>null</tt>, an empty string or an empty collection.
*
* @param value the value, if its a String it will be tested for text length as well
* @param consumer the consumer, the operation to be executed against value if not empty
http://git-wip-us.apache.org/repos/asf/camel/blob/05cbb33c/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java b/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java
index 4308fc6..660e86d 100644
--- a/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java
+++ b/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.builder;
+import java.util.concurrent.Future;
+
import org.apache.camel.CamelExecutionException;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
@@ -47,6 +49,21 @@ public class FluentProducerTemplateTest extends ContextTestSupport {
}
}
+ public void testDefaultEndpoint() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("Bye World");
+
+ FluentProducerTemplate fluent = context.createFluentProducerTemplate();
+ fluent.setDefaultEndpointUri("direct:in");
+
+ Object result = fluent.withBody("Hello World").request();
+ assertMockEndpointsSatisfied();
+
+ assertEquals("Bye World", result);
+
+ assertSame(context, fluent.getCamelContext());
+ }
+
public void testFromCamelContext() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedBodiesReceived("Bye World");
@@ -306,6 +323,52 @@ public class FluentProducerTemplateTest extends ContextTestSupport {
);
}
+ public void testAsyncRequest() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:async");
+ mock.expectedMessageCount(2);
+ mock.expectedHeaderValuesReceivedInAnyOrder("action", "action-1", "action-2");
+ mock.expectedBodiesReceivedInAnyOrder("body-1", "body-2");
+
+ FluentProducerTemplate fluent = context.createFluentProducerTemplate();
+ Future<String> future1 = fluent.to("direct:async").withHeader("action", "action-1").withBody("body-1").asyncRequest(String.class);
+ Future<String> future2 = fluent.to("direct:async").withHeader("action", "action-2").withBody("body-2").asyncRequest(String.class);
+
+ String result1 = future1.get();
+ String result2 = future2.get();
+
+ mock.assertIsSatisfied();
+
+ assertEquals("body-1", result1);
+ assertEquals("body-2", result2);
+
+ String action = mock.getExchanges().get(0).getIn().getHeader("action", String.class);
+ if (action.equals("action-1")) {
+ assertEquals("body-1", mock.getExchanges().get(0).getIn().getBody(String.class));
+ }
+ if (action.equals("action-2")) {
+ assertEquals("body-2", mock.getExchanges().get(0).getIn().getBody(String.class));
+ }
+ }
+
+ public void testAsyncSend() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:async");
+ mock.expectedMessageCount(2);
+
+ FluentProducerTemplate fluent = context.createFluentProducerTemplate();
+
+ Future<Exchange> future1 = fluent.to("direct:async").withHeader("action", "action-1").withBody("body-1").asyncSend();
+ Future<Exchange> future2 = fluent.to("direct:async").withHeader("action", "action-2").withBody("body-2").asyncSend();
+
+ Exchange exchange1 = future1.get();
+ Exchange exchange2 = future2.get();
+
+ assertEquals("action-1", exchange1.getIn().getHeader("action", String.class));
+ assertEquals("body-1", exchange1.getIn().getBody(String.class));
+
+ assertEquals("action-2", exchange2.getIn().getHeader("action", String.class));
+ assertEquals("body-2", exchange2.getIn().getBody(String.class));
+ }
+
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@@ -343,6 +406,9 @@ public class FluentProducerTemplateTest extends ContextTestSupport {
.to("mock:result");
from("direct:inout").transform(constant(123));
+
+ from("direct:async")
+ .to("mock:async");
}
};
}