You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/02/21 12:04:32 UTC

[camel] 03/11: CAMEL-14596: camel-core - Optimize RoutingSlip/DynamicRouter when cacheSize = -1 to avoid creating endpoints. Introduced getPrototypeEndpoint API.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 9990d584352d42b304374e0c278ecc51768e8068
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Feb 21 10:01:14 2020 +0100

    CAMEL-14596: camel-core - Optimize RoutingSlip/DynamicRouter when cacheSize = -1 to avoid creating endpoints. Introduced getPrototypeEndpoint API.
---
 .../org/apache/camel/processor/RoutingSlip.java    | 82 +++++++++++++++-------
 ...acheTest.java => DynamicRouterNoCacheTest.java} | 61 +++++++---------
 .../camel/processor/RecipientListNoCacheTest.java  | 56 +++++++--------
 .../camel/processor/RoutingSlipNoCacheTest.java    | 22 +++++-
 ...istNoCacheTest.java => WireTapNoCacheTest.java} | 75 ++++++++------------
 5 files changed, 159 insertions(+), 137 deletions(-)

diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/RoutingSlip.java b/core/camel-base/src/main/java/org/apache/camel/processor/RoutingSlip.java
index cb31c0c..d8218d5 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/RoutingSlip.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/RoutingSlip.java
@@ -26,6 +26,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.FailedToCreateProducerException;
 import org.apache.camel.Message;
+import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.Traceable;
 import org.apache.camel.impl.engine.DefaultProducerCache;
 import org.apache.camel.impl.engine.EmptyProducerCache;
@@ -35,7 +36,6 @@ import org.apache.camel.spi.ProducerCache;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.spi.RouteIdAware;
 import org.apache.camel.support.AsyncProcessorSupport;
-import org.apache.camel.support.DefaultExchange;
 import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.MessageHelper;
 import org.apache.camel.support.ObjectHelper;
@@ -241,9 +241,19 @@ public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdA
         }
 
         while (iter.hasNext(current)) {
+
+            boolean prototype = cacheSize < 0;
             Endpoint endpoint;
             try {
-                endpoint = resolveEndpoint(iter, exchange);
+                Object recipient = iter.next(exchange);
+                Endpoint existing = getExistingEndpoint(exchange, recipient);
+                if (existing == null) {
+                    endpoint = resolveEndpoint(exchange, recipient, prototype);
+                } else {
+                    endpoint = existing;
+                    // we have an existing endpoint then its not a prototype scope
+                    prototype = false;
+                }
                 // if no endpoint was resolved then try the next
                 if (endpoint == null) {
                     continue;
@@ -255,7 +265,7 @@ public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdA
             }
 
             //process and prepare the routing slip
-            boolean sync = processExchange(endpoint, current, exchange, originalCallback, iter);
+            boolean sync = processExchange(endpoint, current, exchange, originalCallback, iter, prototype);
             current = prepareExchangeForRoutingSlip(current, endpoint);
             
             if (!sync) {
@@ -305,14 +315,28 @@ public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdA
         return true;
     }
 
-    protected Endpoint resolveEndpoint(RoutingSlipIterator iter, Exchange exchange) throws Exception {
-        Object nextRecipient = iter.next(exchange);
+    protected static Endpoint getExistingEndpoint(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException {
+        // trim strings as end users might have added spaces between separators
+        if (recipient instanceof Endpoint) {
+            return (Endpoint) recipient;
+        } else if (recipient instanceof String) {
+            recipient = ((String) recipient).trim();
+        }
+        if (recipient != null) {
+            // convert to a string type we can work with
+            String uri = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient);
+            return exchange.getContext().hasEndpoint(uri);
+        }
+        return null;
+    }
+
+    protected Endpoint resolveEndpoint(Exchange exchange, Object recipient, boolean prototype) throws Exception {
         Endpoint endpoint = null;
         try {
-            endpoint = ExchangeHelper.resolveEndpoint(exchange, nextRecipient);
+            endpoint = prototype ? ExchangeHelper.resolvePrototypeEndpoint(exchange, recipient) : ExchangeHelper.resolveEndpoint(exchange, recipient);
         } catch (Exception e) {
             if (isIgnoreInvalidEndpoints()) {
-                LOG.info("Endpoint uri is invalid: " + nextRecipient + ". This exception will be ignored.", e);
+                LOG.debug("Endpoint uri is invalid: " + recipient + ". This exception will be ignored.", e);
             } else {
                 throw e;
             }
@@ -355,7 +379,7 @@ public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdA
     }
 
     protected boolean processExchange(final Endpoint endpoint, final Exchange exchange, final Exchange original,
-                                      final AsyncCallback originalCallback, final RoutingSlipIterator iter) {
+                                      final AsyncCallback originalCallback, final RoutingSlipIterator iter, final boolean prototype) {
 
         // this does the actual processing so log at trace level
         if (LOG.isTraceEnabled()) {
@@ -389,6 +413,10 @@ public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdA
 
                     // we only have to handle async completion of the routing slip
                     if (doneSync) {
+                        // and stop prototype endpoints
+                        if (prototype) {
+                            ServiceHelper.stopAndShutdownService(endpoint);
+                        }
                         cb.done(true);
                         return;
                     }
@@ -416,11 +444,20 @@ public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdA
                                 break;
                             }
 
-                            Endpoint endpoint1;
+                            Endpoint nextEndpoint;
+                            boolean prototype = cacheSize < 0;
                             try {
-                                endpoint1 = resolveEndpoint(iter, ex);
+                                Object recipient = iter.next(ex);
+                                Endpoint existing = getExistingEndpoint(exchange, recipient);
+                                if (existing == null) {
+                                    nextEndpoint = resolveEndpoint(exchange, recipient, prototype);
+                                } else {
+                                    nextEndpoint = existing;
+                                    // we have an existing endpoint then its not a prototype scope
+                                    prototype = false;
+                                }
                                 // if no endpoint was resolved then try the next
-                                if (endpoint1 == null) {
+                                if (nextEndpoint == null) {
                                     continue;
                                 }
                             } catch (Exception e) {
@@ -430,8 +467,16 @@ public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdA
                             }
 
                             // prepare and process the routing slip
-                            boolean sync = processExchange(endpoint1, current, original, cb, iter);
-                            current = prepareExchangeForRoutingSlip(current, endpoint1);
+                            final boolean prototypeEndpoint = prototype;
+                            AsyncCallback cbNext = (doneNext) -> {
+                                // and stop prototype endpoints
+                                if (prototypeEndpoint) {
+                                    ServiceHelper.stopAndShutdownService(nextEndpoint);
+                                }
+                                cb.done(doneNext);
+                            };
+                            boolean sync = processExchange(nextEndpoint, current, original, cbNext, iter, prototype);
+                            current = prepareExchangeForRoutingSlip(current, nextEndpoint);
 
                             if (!sync) {
                                 if (LOG.isTraceEnabled()) {
@@ -499,17 +544,6 @@ public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdA
     }
 
     /**
-     * Copy the outbound data in 'source' to the inbound data in 'result'.
-     */
-    private void copyOutToIn(Exchange result, Exchange source) {
-        result.setException(source.getException());
-        result.setIn(getResultMessage(source));
-
-        result.getProperties().clear();
-        result.getProperties().putAll(source.getProperties());
-    }
-
-    /**
      * Creates the embedded processor to use when wrapping this routing slip in an error handler.
      */
     public AsyncProcessor newRoutingSlipProcessorForErrorHandler() {
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/DynamicRouterNoCacheTest.java
similarity index 61%
copy from core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
copy to core/camel-core/src/test/java/org/apache/camel/processor/DynamicRouterNoCacheTest.java
index f396c42..16f188b 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/DynamicRouterNoCacheTest.java
@@ -17,50 +17,21 @@
 package org.apache.camel.processor;
 
 import java.util.List;
-import java.util.concurrent.ExecutorService;
+import java.util.Map;
 
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Headers;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.engine.EmptyProducerCache;
-import org.apache.camel.util.ReflectionHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.StringHelper;
 import org.junit.Test;
 
-public class RecipientListNoCacheTest extends ContextTestSupport {
+public class DynamicRouterNoCacheTest extends ContextTestSupport {
 
     @Test
     public void testNoCache() throws Exception {
-        MockEndpoint x = getMockEndpoint("mock:x");
-        MockEndpoint y = getMockEndpoint("mock:y");
-        MockEndpoint z = getMockEndpoint("mock:z");
-
-        x.expectedBodiesReceived("foo", "bar");
-        y.expectedBodiesReceived("foo", "bar");
-        z.expectedBodiesReceived("foo", "bar");
-
-        sendBody("foo");
-        sendBody("bar");
-
-        assertMockEndpointsSatisfied();
-
-        // make sure its using an empty producer cache as the cache is disabled
-        List<Processor> list = context.getRoute("route1").filter("foo");
-        RecipientList rl = (RecipientList) list.get(0);
-        assertNotNull(rl);
-        assertEquals(-1, rl.getCacheSize());
-
-        Object pc = ReflectionHelper.getField(rl.getClass().getDeclaredField("producerCache"), rl);
-        assertNotNull(pc);
-        assertIsInstanceOf(EmptyProducerCache.class, pc);
-
-        // and no thread pool is in use as timeout is 0
-        pc = ReflectionHelper.getField(rl.getClass().getDeclaredField("aggregateExecutorService"), rl);
-        assertNull(pc);
-    }
-
-    @Test
-    public void testNoCacheTwo() throws Exception {
         assertEquals(1, context.getEndpointRegistry().size());
 
         sendBody("foo");
@@ -68,7 +39,7 @@ public class RecipientListNoCacheTest extends ContextTestSupport {
 
         // make sure its using an empty producer cache as the cache is disabled
         List<Processor> list = context.getRoute("route1").filter("foo");
-        RecipientList rl = (RecipientList) list.get(0);
+        DynamicRouter rl = (DynamicRouter) list.get(0);
         assertNotNull(rl);
         assertEquals(-1, rl.getCacheSize());
 
@@ -96,12 +67,28 @@ public class RecipientListNoCacheTest extends ContextTestSupport {
         template.sendBodyAndHeader("direct:a", body, "recipientListHeader", "mock:x,mock:y,mock:z");
     }
 
+    public String slip(@Headers Map headers) {
+        String header = (String) headers.get("recipientListHeader");
+        if (ObjectHelper.isEmpty(header)) {
+            return null;
+        }
+        if (header.contains(",")) {
+            String next = StringHelper.before(header, ",");
+            String rest = StringHelper.after(header, ",");
+            headers.put("recipientListHeader", rest);
+            return next;
+        } else {
+            // last slip
+            headers.put("recipientListHeader", "");
+            return header;
+        }
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
-                from("direct:a")
-                        .recipientList(header("recipientListHeader").tokenize(",")).cacheSize(-1).id("foo");
+                from("direct:a").dynamicRouter(method(DynamicRouterNoCacheTest.class, "slip")).cacheSize(-1).id("foo");
             }
         };
 
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
index f396c42..a97b2d3 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
@@ -31,51 +31,39 @@ public class RecipientListNoCacheTest extends ContextTestSupport {
 
     @Test
     public void testNoCache() throws Exception {
-        MockEndpoint x = getMockEndpoint("mock:x");
-        MockEndpoint y = getMockEndpoint("mock:y");
-        MockEndpoint z = getMockEndpoint("mock:z");
-
-        x.expectedBodiesReceived("foo", "bar");
-        y.expectedBodiesReceived("foo", "bar");
-        z.expectedBodiesReceived("foo", "bar");
+        assertEquals(1, context.getEndpointRegistry().size());
 
         sendBody("foo");
         sendBody("bar");
 
-        assertMockEndpointsSatisfied();
-
         // make sure its using an empty producer cache as the cache is disabled
         List<Processor> list = context.getRoute("route1").filter("foo");
         RecipientList rl = (RecipientList) list.get(0);
         assertNotNull(rl);
         assertEquals(-1, rl.getCacheSize());
 
-        Object pc = ReflectionHelper.getField(rl.getClass().getDeclaredField("producerCache"), rl);
-        assertNotNull(pc);
-        assertIsInstanceOf(EmptyProducerCache.class, pc);
+        // check no additional endpoints added as cache was disabled
+        assertEquals(1, context.getEndpointRegistry().size());
 
-        // and no thread pool is in use as timeout is 0
-        pc = ReflectionHelper.getField(rl.getClass().getDeclaredField("aggregateExecutorService"), rl);
-        assertNull(pc);
-    }
+        // now send again with mocks which then add endpoints
+        MockEndpoint x = getMockEndpoint("mock:x");
+        MockEndpoint y = getMockEndpoint("mock:y");
+        MockEndpoint z = getMockEndpoint("mock:z");
 
-    @Test
-    public void testNoCacheTwo() throws Exception {
-        assertEquals(1, context.getEndpointRegistry().size());
+        x.expectedBodiesReceived("foo", "bar");
+        y.expectedBodiesReceived("foo", "bar");
+        z.expectedBodiesReceived("foo", "bar");
 
         sendBody("foo");
         sendBody("bar");
 
-        // make sure its using an empty producer cache as the cache is disabled
-        List<Processor> list = context.getRoute("route1").filter("foo");
-        RecipientList rl = (RecipientList) list.get(0);
-        assertNotNull(rl);
-        assertEquals(-1, rl.getCacheSize());
+        assertMockEndpointsSatisfied();
 
-        // check no additional endpoints added as cache was disabled
-        assertEquals(1, context.getEndpointRegistry().size());
+        assertEquals(4, context.getEndpointRegistry().size());
+    }
 
-        // now send again with mocks which then add endpoints
+    @Test
+    public void testNoThreadPool() throws Exception {
         MockEndpoint x = getMockEndpoint("mock:x");
         MockEndpoint y = getMockEndpoint("mock:y");
         MockEndpoint z = getMockEndpoint("mock:z");
@@ -89,7 +77,19 @@ public class RecipientListNoCacheTest extends ContextTestSupport {
 
         assertMockEndpointsSatisfied();
 
-        assertEquals(4, context.getEndpointRegistry().size());
+        // make sure its using an empty producer cache as the cache is disabled
+        List<Processor> list = context.getRoute("route1").filter("foo");
+        RecipientList rl = (RecipientList) list.get(0);
+        assertNotNull(rl);
+        assertEquals(-1, rl.getCacheSize());
+
+        Object pc = ReflectionHelper.getField(rl.getClass().getDeclaredField("producerCache"), rl);
+        assertNotNull(pc);
+        assertIsInstanceOf(EmptyProducerCache.class, pc);
+
+        // and no thread pool is in use as timeout is 0
+        pc = ReflectionHelper.getField(rl.getClass().getDeclaredField("aggregateExecutorService"), rl);
+        assertNull(pc);
     }
 
     protected void sendBody(String body) {
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipNoCacheTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipNoCacheTest.java
index 752916e..e1e92fb 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipNoCacheTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipNoCacheTest.java
@@ -16,7 +16,10 @@
  */
 package org.apache.camel.processor;
 
+import java.util.List;
+
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.junit.Test;
@@ -25,6 +28,21 @@ public class RoutingSlipNoCacheTest extends ContextTestSupport {
 
     @Test
     public void testNoCache() throws Exception {
+        assertEquals(1, context.getEndpointRegistry().size());
+
+        sendBody("foo");
+        sendBody("bar");
+
+        // make sure its using an empty producer cache as the cache is disabled
+        List<Processor> list = context.getRoute("route1").filter("foo");
+        RoutingSlip rl = (RoutingSlip) list.get(0);
+        assertNotNull(rl);
+        assertEquals(-1, rl.getCacheSize());
+
+        // check no additional endpoints added as cache was disabled
+        assertEquals(1, context.getEndpointRegistry().size());
+
+        // now send again with mocks which then add endpoints
         MockEndpoint x = getMockEndpoint("mock:x");
         MockEndpoint y = getMockEndpoint("mock:y");
         MockEndpoint z = getMockEndpoint("mock:z");
@@ -37,6 +55,8 @@ public class RoutingSlipNoCacheTest extends ContextTestSupport {
         sendBody("bar");
 
         assertMockEndpointsSatisfied();
+
+        assertEquals(4, context.getEndpointRegistry().size());
     }
 
     protected void sendBody(String body) {
@@ -47,7 +67,7 @@ public class RoutingSlipNoCacheTest extends ContextTestSupport {
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
-                from("direct:a").routingSlip(header("recipientListHeader").tokenize(",")).cacheSize(0);
+                from("direct:a").routingSlip(header("recipientListHeader").tokenize(",")).cacheSize(-1).id("foo");
             }
         };
 
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/WireTapNoCacheTest.java
similarity index 50%
copy from core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
copy to core/camel-core/src/test/java/org/apache/camel/processor/WireTapNoCacheTest.java
index f396c42..44e3dc9 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/WireTapNoCacheTest.java
@@ -17,83 +17,64 @@
 package org.apache.camel.processor;
 
 import java.util.List;
-import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.engine.EmptyProducerCache;
-import org.apache.camel.util.ReflectionHelper;
 import org.junit.Test;
 
-public class RecipientListNoCacheTest extends ContextTestSupport {
+public class WireTapNoCacheTest extends ContextTestSupport {
 
     @Test
     public void testNoCache() throws Exception {
-        MockEndpoint x = getMockEndpoint("mock:x");
-        MockEndpoint y = getMockEndpoint("mock:y");
-        MockEndpoint z = getMockEndpoint("mock:z");
-
-        x.expectedBodiesReceived("foo", "bar");
-        y.expectedBodiesReceived("foo", "bar");
-        z.expectedBodiesReceived("foo", "bar");
-
-        sendBody("foo");
-        sendBody("bar");
-
-        assertMockEndpointsSatisfied();
-
-        // make sure its using an empty producer cache as the cache is disabled
-        List<Processor> list = context.getRoute("route1").filter("foo");
-        RecipientList rl = (RecipientList) list.get(0);
-        assertNotNull(rl);
-        assertEquals(-1, rl.getCacheSize());
-
-        Object pc = ReflectionHelper.getField(rl.getClass().getDeclaredField("producerCache"), rl);
-        assertNotNull(pc);
-        assertIsInstanceOf(EmptyProducerCache.class, pc);
-
-        // and no thread pool is in use as timeout is 0
-        pc = ReflectionHelper.getField(rl.getClass().getDeclaredField("aggregateExecutorService"), rl);
-        assertNull(pc);
-    }
-
-    @Test
-    public void testNoCacheTwo() throws Exception {
         assertEquals(1, context.getEndpointRegistry().size());
 
-        sendBody("foo");
-        sendBody("bar");
+        sendBody("foo", "mock:x");
+        sendBody("foo", "mock:y");
+        sendBody("foo", "mock:z");
+        sendBody("bar", "mock:x");
+        sendBody("bar", "mock:y");
+        sendBody("bar", "mock:z");
 
         // make sure its using an empty producer cache as the cache is disabled
         List<Processor> list = context.getRoute("route1").filter("foo");
-        RecipientList rl = (RecipientList) list.get(0);
-        assertNotNull(rl);
-        assertEquals(-1, rl.getCacheSize());
+        WireTapProcessor wtp = (WireTapProcessor) list.get(0);
+        assertNotNull(wtp);
+        assertEquals(-1, wtp.getCacheSize());
 
         // check no additional endpoints added as cache was disabled
         assertEquals(1, context.getEndpointRegistry().size());
 
         // now send again with mocks which then add endpoints
+
         MockEndpoint x = getMockEndpoint("mock:x");
         MockEndpoint y = getMockEndpoint("mock:y");
         MockEndpoint z = getMockEndpoint("mock:z");
 
-        x.expectedBodiesReceived("foo", "bar");
-        y.expectedBodiesReceived("foo", "bar");
-        z.expectedBodiesReceived("foo", "bar");
+        x.expectedBodiesReceivedInAnyOrder("foo", "bar");
+        y.expectedBodiesReceivedInAnyOrder("foo", "bar");
+        z.expectedBodiesReceivedInAnyOrder("foo", "bar");
+
+        assertEquals(4, context.getEndpointRegistry().size());
+
+        sendBody("foo", "mock:x");
+        sendBody("foo", "mock:y");
+        sendBody("foo", "mock:z");
+        sendBody("bar", "mock:x");
+        sendBody("bar", "mock:y");
+        sendBody("bar", "mock:z");
 
-        sendBody("foo");
-        sendBody("bar");
+        // should not register as new endpoint so we keep at 4
+        sendBody("dummy", "mock:dummy");
 
         assertMockEndpointsSatisfied();
 
         assertEquals(4, context.getEndpointRegistry().size());
     }
 
-    protected void sendBody(String body) {
-        template.sendBodyAndHeader("direct:a", body, "recipientListHeader", "mock:x,mock:y,mock:z");
+    protected void sendBody(String body, String uri) {
+        template.sendBodyAndHeader("direct:a", body, "myHeader", uri);
     }
 
     @Override
@@ -101,7 +82,7 @@ public class RecipientListNoCacheTest extends ContextTestSupport {
         return new RouteBuilder() {
             public void configure() {
                 from("direct:a")
-                        .recipientList(header("recipientListHeader").tokenize(",")).cacheSize(-1).id("foo");
+                        .wireTap("${header.myHeader}").cacheSize(-1).id("foo").end();
             }
         };