You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2017/03/07 13:16:37 UTC

camel git commit: CAMEL-10820: DefaultFluentProducerTemplate mixes up data when sending asynchronously

Repository: camel
Updated Branches:
  refs/heads/master 4c8eab4e7 -> 05cbb33c0


CAMEL-10820: DefaultFluentProducerTemplate mixes up data when sending asynchronously


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/05cbb33c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/05cbb33c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/05cbb33c

Branch: refs/heads/master
Commit: 05cbb33c0da05aeb4c8c70137a41d53a58c38388
Parents: 4c8eab4
Author: lburgazzoli <lb...@gmail.com>
Authored: Mon Mar 6 11:26:19 2017 +0100
Committer: lburgazzoli <lb...@gmail.com>
Committed: Tue Mar 7 14:15:57 2017 +0100

----------------------------------------------------------------------
 .../builder/DefaultFluentProducerTemplate.java  | 140 ++++++++++---------
 .../org/apache/camel/util/ObjectHelper.java     |   7 +-
 .../builder/FluentProducerTemplateTest.java     |  66 +++++++++
 3 files changed, 147 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/05cbb33c/camel-core/src/main/java/org/apache/camel/builder/DefaultFluentProducerTemplate.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/builder/DefaultFluentProducerTemplate.java b/camel-core/src/main/java/org/apache/camel/builder/DefaultFluentProducerTemplate.java
index bac078e..965afef 100644
--- a/camel-core/src/main/java/org/apache/camel/builder/DefaultFluentProducerTemplate.java
+++ b/camel-core/src/main/java/org/apache/camel/builder/DefaultFluentProducerTemplate.java
@@ -18,6 +18,7 @@ package org.apache.camel.builder;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.Future;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
@@ -42,24 +43,23 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
     private final ClassValue<ConvertBodyProcessor> resultProcessors;
     private Map<String, Object> headers;
     private Object body;
-    private Endpoint endpoint;
-    private Consumer<ProducerTemplate> templateCustomizer;
-    private Supplier<Exchange> exchangeSupplier;
-    private Supplier<Processor> processorSupplier;
-    private volatile ProducerTemplate template;
-    private Endpoint defaultEndpoint;
+    private Optional<Consumer<ProducerTemplate>> templateCustomizer;
+    private Optional<Supplier<Exchange>> exchangeSupplier;
+    private Optional<Supplier<Processor>> processorSupplier;
+    private Optional<Endpoint> endpoint;
+    private Optional<Endpoint> defaultEndpoint;
     private int maximumCacheSize;
-    private boolean eventNotifierEnabled = true;
+    private boolean eventNotifierEnabled;
+    private volatile ProducerTemplate template;
 
     public DefaultFluentProducerTemplate(CamelContext context) {
         this.context = context;
-        this.headers = null;
-        this.body = null;
-        this.endpoint = null;
-        this.templateCustomizer = null;
-        this.exchangeSupplier = null;
-        this.processorSupplier = () -> this::populateExchange;
-        this.template = null;
+        this.endpoint = Optional.empty();
+        this.defaultEndpoint = Optional.empty();
+        this.eventNotifierEnabled = true;
+        this.templateCustomizer = Optional.empty();
+        this.exchangeSupplier = Optional.empty();
+        this.processorSupplier = Optional.empty();
         this.resultProcessors = new ClassValue<ConvertBodyProcessor>() {
             @Override
             protected ConvertBodyProcessor computeValue(Class<?> type) {
@@ -95,12 +95,12 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
 
     @Override
     public Endpoint getDefaultEndpoint() {
-        return defaultEndpoint;
+        return defaultEndpoint.orElse(null);
     }
 
     @Override
     public void setDefaultEndpoint(Endpoint defaultEndpoint) {
-        this.defaultEndpoint = defaultEndpoint;
+        this.defaultEndpoint = Optional.of(defaultEndpoint);
     }
 
     @Override
@@ -168,7 +168,7 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
 
     @Override
     public FluentProducerTemplate withTemplateCustomizer(final Consumer<ProducerTemplate> templateCustomizer) {
-        this.templateCustomizer = templateCustomizer;
+        this.templateCustomizer = Optional.of(templateCustomizer);
         return this;
     }
 
@@ -179,7 +179,7 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
 
     @Override
     public FluentProducerTemplate withExchange(final Supplier<Exchange> exchangeSupplier) {
-        this.exchangeSupplier = exchangeSupplier;
+        this.exchangeSupplier = Optional.of(exchangeSupplier);
         return this;
     }
 
@@ -190,7 +190,7 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
 
     @Override
     public FluentProducerTemplate withProcessor(final Supplier<Processor> processorSupplier) {
-        this.processorSupplier = processorSupplier;
+        this.processorSupplier = Optional.of(processorSupplier);
         return this;
     }
 
@@ -201,7 +201,7 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
 
     @Override
     public FluentProducerTemplate to(Endpoint endpoint) {
-        this.endpoint = endpoint;
+        this.endpoint = Optional.of(endpoint);
         return this;
     }
 
@@ -217,13 +217,13 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
     @Override
     @SuppressWarnings("unchecked")
     public <T> T request(Class<T> type) throws CamelExecutionException {
-        T result;
-        Endpoint target = endpoint != null ? endpoint : defaultEndpoint;
-        // we must have an endpoint to send to
-        if (target == null) {
-            throw new IllegalArgumentException("No endpoint configured on FluentProducerTemplate. You can configure an endpoint with to(uri)");
-        }
+        // Determine the target endpoint
+        final Endpoint target = target();
+
+        // Create the default processor if not provided.
+        final Supplier<Processor> processorSupplier = this.processorSupplier.orElse(() -> defaultProcessor());
 
+        T result;
         if (type == Exchange.class) {
             result = (T)template().request(target, processorSupplier.get());
         } else if (type == Message.class) {
@@ -253,18 +253,23 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
 
     @Override
     public <T> Future<T> asyncRequest(Class<T> type) {
-        Endpoint target = endpoint != null ? endpoint : defaultEndpoint;
-
-        // we must have an endpoint to send to
-        if (target == null) {
-            throw new IllegalArgumentException("No endpoint configured on FluentProducerTemplate. You can configure an endpoint with to(uri)");
-        }
+        // Determine the target endpoint
+        final Endpoint target = target();
 
         Future<T> result;
-        if (headers != null) {
-            result = template().asyncRequestBodyAndHeaders(target, body, headers, type);
+        if (ObjectHelper.isNotEmpty(headers)) {
+            // Make a copy of the headers and body so that async processing won't
+            // be invalidated by subsequent reuse of the template
+            final Map<String, Object> headersCopy = new HashMap<>(headers);
+            final Object bodyCopy = body;
+
+            result = template().asyncRequestBodyAndHeaders(target, bodyCopy, headersCopy, type);
         } else {
-            result = template().asyncRequestBody(target, body, type);
+            // Make a copy of the and body so that async processing won't be
+            // invalidated by subsequent reuse of the template
+            final Object bodyCopy = body;
+
+            result = template().asyncRequestBody(target, bodyCopy, type);
         }
 
         return result;
@@ -276,30 +281,22 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
 
     @Override
     public Exchange send() throws CamelExecutionException {
-        Endpoint target = endpoint != null ? endpoint : defaultEndpoint;
-
-        // we must have an endpoint to send to
-        if (target == null) {
-            throw new IllegalArgumentException("No endpoint configured on FluentProducerTemplate. You can configure an endpoint with to(uri)");
-        }
+        // Determine the target endpoint
+        final Endpoint target = target();
 
-        return exchangeSupplier != null
-            ? template().send(target, exchangeSupplier.get())
-            : template().send(target, processorSupplier.get());
+        return exchangeSupplier.isPresent()
+            ? template().send(target, exchangeSupplier.get().get())
+            : template().send(target, processorSupplier.orElse(() -> defaultProcessor()).get());
     }
 
     @Override
     public Future<Exchange> asyncSend() {
-        Endpoint target = endpoint != null ? endpoint : defaultEndpoint;
-
-        // we must have an endpoint to send to
-        if (target == null) {
-            throw new IllegalArgumentException("No endpoint configured on FluentProducerTemplate. You can configure an endpoint with to(uri)");
-        }
+        // Determine the target endpoint
+        final Endpoint target = target();
 
-        return exchangeSupplier != null
-            ? template().asyncSend(target, exchangeSupplier.get())
-            : template().asyncSend(target, processorSupplier.get());
+        return exchangeSupplier.isPresent()
+            ? template().asyncSend(target, exchangeSupplier.get().get())
+            : template().asyncSend(target, processorSupplier.orElse(() -> defaultAsyncProcessor()).get());
     }
 
     // ************************
@@ -320,25 +317,40 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu
 
         if (template == null) {
             template = maximumCacheSize > 0 ? context.createProducerTemplate(maximumCacheSize) : context.createProducerTemplate();
-            if (defaultEndpoint != null) {
-                template.setDefaultEndpoint(defaultEndpoint);
-            }
+            defaultEndpoint.ifPresent(template::setDefaultEndpoint);
             template.setEventNotifierEnabled(eventNotifierEnabled);
-            if (templateCustomizer != null) {
-                templateCustomizer.accept(template);
-            }
+            templateCustomizer.ifPresent(tc -> tc.accept(template));
         }
 
         return template;
     }
 
-    private void populateExchange(Exchange exchange) throws Exception {
-        if (headers != null && !headers.isEmpty()) {
-            exchange.getIn().getHeaders().putAll(headers);
+    private Processor defaultProcessor() {
+        return exchange -> {
+            ObjectHelper.ifNotEmpty(headers, exchange.getIn().getHeaders()::putAll);
+            ObjectHelper.ifNotEmpty(body, exchange.getIn()::setBody);
+        };
+    }
+
+    private Processor defaultAsyncProcessor() {
+        final Map<String, Object> headersCopy = ObjectHelper.isNotEmpty(this.headers) ? new HashMap<>(this.headers) : null;
+        final Object bodyCopy = this.body;
+
+        return exchange -> {
+            ObjectHelper.ifNotEmpty(headersCopy, exchange.getIn().getHeaders()::putAll);
+            ObjectHelper.ifNotEmpty(bodyCopy, exchange.getIn()::setBody);
+        };
+    }
+
+    private Endpoint target() {
+        if (endpoint.isPresent()) {
+            return endpoint.get();
         }
-        if (body != null) {
-            exchange.getIn().setBody(body);
+        if (defaultEndpoint.isPresent()) {
+            return defaultEndpoint.get();
         }
+
+        throw new IllegalArgumentException("No endpoint configured on FluentProducerTemplate. You can configure an endpoint with to(uri)");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/05cbb33c/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java b/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java
index b87cbc6..91e3862 100644
--- a/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java
+++ b/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java
@@ -375,24 +375,27 @@ public final class ObjectHelper {
     }
 
     /**
-     * Tests whether the value is <b>not</b> <tt>null</tt> or an empty string.
+     * Tests whether the value is <b>not</b> <tt>null</tt>, an empty string or an empty collection.
      *
      * @param value  the value, if its a String it will be tested for text length as well
      * @return true if <b>not</b> empty
      */
+    @SuppressWarnings("unchecked")
     public static boolean isNotEmpty(Object value) {
         if (value == null) {
             return false;
         } else if (value instanceof String) {
             String text = (String) value;
             return text.trim().length() > 0;
+        } else if (value instanceof Collection) {
+            return !((Collection<?>)value).isEmpty();
         } else {
             return true;
         }
     }
 
     /**
-     * Tests whether the value is <b>not</b> <tt>null</tt> or an empty string.
+     * Tests whether the value is <b>not</b> <tt>null</tt>, an empty string or an empty collection.
      *
      * @param value  the value, if its a String it will be tested for text length as well
      * @param consumer  the consumer, the operation to be executed against value if not empty

http://git-wip-us.apache.org/repos/asf/camel/blob/05cbb33c/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java b/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java
index 4308fc6..660e86d 100644
--- a/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java
+++ b/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.builder;
 
+import java.util.concurrent.Future;
+
 import org.apache.camel.CamelExecutionException;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
@@ -47,6 +49,21 @@ public class FluentProducerTemplateTest extends ContextTestSupport {
         }
     }
 
+    public void testDefaultEndpoint() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Bye World");
+
+        FluentProducerTemplate fluent = context.createFluentProducerTemplate();
+        fluent.setDefaultEndpointUri("direct:in");
+
+        Object result = fluent.withBody("Hello World").request();
+        assertMockEndpointsSatisfied();
+
+        assertEquals("Bye World", result);
+
+        assertSame(context, fluent.getCamelContext());
+    }
+
     public void testFromCamelContext() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedBodiesReceived("Bye World");
@@ -306,6 +323,52 @@ public class FluentProducerTemplateTest extends ContextTestSupport {
         );
     }
 
+    public void testAsyncRequest() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:async");
+        mock.expectedMessageCount(2);
+        mock.expectedHeaderValuesReceivedInAnyOrder("action", "action-1", "action-2");
+        mock.expectedBodiesReceivedInAnyOrder("body-1", "body-2");
+
+        FluentProducerTemplate fluent = context.createFluentProducerTemplate();
+        Future<String> future1 = fluent.to("direct:async").withHeader("action", "action-1").withBody("body-1").asyncRequest(String.class);
+        Future<String> future2 = fluent.to("direct:async").withHeader("action", "action-2").withBody("body-2").asyncRequest(String.class);
+
+        String result1 = future1.get();
+        String result2 = future2.get();
+
+        mock.assertIsSatisfied();
+
+        assertEquals("body-1", result1);
+        assertEquals("body-2", result2);
+
+        String action = mock.getExchanges().get(0).getIn().getHeader("action", String.class);
+        if (action.equals("action-1")) {
+            assertEquals("body-1", mock.getExchanges().get(0).getIn().getBody(String.class));
+        }
+        if (action.equals("action-2")) {
+            assertEquals("body-2", mock.getExchanges().get(0).getIn().getBody(String.class));
+        }
+    }
+
+    public void testAsyncSend() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:async");
+        mock.expectedMessageCount(2);
+
+        FluentProducerTemplate fluent = context.createFluentProducerTemplate();
+
+        Future<Exchange> future1 = fluent.to("direct:async").withHeader("action", "action-1").withBody("body-1").asyncSend();
+        Future<Exchange> future2 = fluent.to("direct:async").withHeader("action", "action-2").withBody("body-2").asyncSend();
+
+        Exchange exchange1 = future1.get();
+        Exchange exchange2 = future2.get();
+
+        assertEquals("action-1", exchange1.getIn().getHeader("action", String.class));
+        assertEquals("body-1", exchange1.getIn().getBody(String.class));
+
+        assertEquals("action-2", exchange2.getIn().getHeader("action", String.class));
+        assertEquals("body-2", exchange2.getIn().getBody(String.class));
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
@@ -343,6 +406,9 @@ public class FluentProducerTemplateTest extends ContextTestSupport {
                     .to("mock:result");
 
                 from("direct:inout").transform(constant(123));
+
+                from("direct:async")
+                    .to("mock:async");
             }
         };
     }