You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/02/21 12:04:32 UTC
[camel] 03/11: CAMEL-14596: camel-core - Optimize
RoutingSlip/DynamicRouter when cacheSize = -1 to avoid creating endpoints.
Introduced getPrototypeEndpoint API.
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 9990d584352d42b304374e0c278ecc51768e8068
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Feb 21 10:01:14 2020 +0100
CAMEL-14596: camel-core - Optimize RoutingSlip/DynamicRouter when cacheSize = -1 to avoid creating endpoints. Introduced getPrototypeEndpoint API.
---
.../org/apache/camel/processor/RoutingSlip.java | 82 +++++++++++++++-------
...acheTest.java => DynamicRouterNoCacheTest.java} | 61 +++++++---------
.../camel/processor/RecipientListNoCacheTest.java | 56 +++++++--------
.../camel/processor/RoutingSlipNoCacheTest.java | 22 +++++-
...istNoCacheTest.java => WireTapNoCacheTest.java} | 75 ++++++++------------
5 files changed, 159 insertions(+), 137 deletions(-)
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/RoutingSlip.java b/core/camel-base/src/main/java/org/apache/camel/processor/RoutingSlip.java
index cb31c0c..d8218d5 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/RoutingSlip.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/RoutingSlip.java
@@ -26,6 +26,7 @@ import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.FailedToCreateProducerException;
import org.apache.camel.Message;
+import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.Traceable;
import org.apache.camel.impl.engine.DefaultProducerCache;
import org.apache.camel.impl.engine.EmptyProducerCache;
@@ -35,7 +36,6 @@ import org.apache.camel.spi.ProducerCache;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorSupport;
-import org.apache.camel.support.DefaultExchange;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.MessageHelper;
import org.apache.camel.support.ObjectHelper;
@@ -241,9 +241,19 @@ public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdA
}
while (iter.hasNext(current)) {
+
+ boolean prototype = cacheSize < 0;
Endpoint endpoint;
try {
- endpoint = resolveEndpoint(iter, exchange);
+ Object recipient = iter.next(exchange);
+ Endpoint existing = getExistingEndpoint(exchange, recipient);
+ if (existing == null) {
+ endpoint = resolveEndpoint(exchange, recipient, prototype);
+ } else {
+ endpoint = existing;
+ // we have an existing endpoint then its not a prototype scope
+ prototype = false;
+ }
// if no endpoint was resolved then try the next
if (endpoint == null) {
continue;
@@ -255,7 +265,7 @@ public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdA
}
//process and prepare the routing slip
- boolean sync = processExchange(endpoint, current, exchange, originalCallback, iter);
+ boolean sync = processExchange(endpoint, current, exchange, originalCallback, iter, prototype);
current = prepareExchangeForRoutingSlip(current, endpoint);
if (!sync) {
@@ -305,14 +315,28 @@ public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdA
return true;
}
- protected Endpoint resolveEndpoint(RoutingSlipIterator iter, Exchange exchange) throws Exception {
- Object nextRecipient = iter.next(exchange);
+ protected static Endpoint getExistingEndpoint(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException {
+ // trim strings as end users might have added spaces between separators
+ if (recipient instanceof Endpoint) {
+ return (Endpoint) recipient;
+ } else if (recipient instanceof String) {
+ recipient = ((String) recipient).trim();
+ }
+ if (recipient != null) {
+ // convert to a string type we can work with
+ String uri = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient);
+ return exchange.getContext().hasEndpoint(uri);
+ }
+ return null;
+ }
+
+ protected Endpoint resolveEndpoint(Exchange exchange, Object recipient, boolean prototype) throws Exception {
Endpoint endpoint = null;
try {
- endpoint = ExchangeHelper.resolveEndpoint(exchange, nextRecipient);
+ endpoint = prototype ? ExchangeHelper.resolvePrototypeEndpoint(exchange, recipient) : ExchangeHelper.resolveEndpoint(exchange, recipient);
} catch (Exception e) {
if (isIgnoreInvalidEndpoints()) {
- LOG.info("Endpoint uri is invalid: " + nextRecipient + ". This exception will be ignored.", e);
+ LOG.debug("Endpoint uri is invalid: " + recipient + ". This exception will be ignored.", e);
} else {
throw e;
}
@@ -355,7 +379,7 @@ public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdA
}
protected boolean processExchange(final Endpoint endpoint, final Exchange exchange, final Exchange original,
- final AsyncCallback originalCallback, final RoutingSlipIterator iter) {
+ final AsyncCallback originalCallback, final RoutingSlipIterator iter, final boolean prototype) {
// this does the actual processing so log at trace level
if (LOG.isTraceEnabled()) {
@@ -389,6 +413,10 @@ public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdA
// we only have to handle async completion of the routing slip
if (doneSync) {
+ // and stop prototype endpoints
+ if (prototype) {
+ ServiceHelper.stopAndShutdownService(endpoint);
+ }
cb.done(true);
return;
}
@@ -416,11 +444,20 @@ public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdA
break;
}
- Endpoint endpoint1;
+ Endpoint nextEndpoint;
+ boolean prototype = cacheSize < 0;
try {
- endpoint1 = resolveEndpoint(iter, ex);
+ Object recipient = iter.next(ex);
+ Endpoint existing = getExistingEndpoint(exchange, recipient);
+ if (existing == null) {
+ nextEndpoint = resolveEndpoint(exchange, recipient, prototype);
+ } else {
+ nextEndpoint = existing;
+ // we have an existing endpoint then its not a prototype scope
+ prototype = false;
+ }
// if no endpoint was resolved then try the next
- if (endpoint1 == null) {
+ if (nextEndpoint == null) {
continue;
}
} catch (Exception e) {
@@ -430,8 +467,16 @@ public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdA
}
// prepare and process the routing slip
- boolean sync = processExchange(endpoint1, current, original, cb, iter);
- current = prepareExchangeForRoutingSlip(current, endpoint1);
+ final boolean prototypeEndpoint = prototype;
+ AsyncCallback cbNext = (doneNext) -> {
+ // and stop prototype endpoints
+ if (prototypeEndpoint) {
+ ServiceHelper.stopAndShutdownService(nextEndpoint);
+ }
+ cb.done(doneNext);
+ };
+ boolean sync = processExchange(nextEndpoint, current, original, cbNext, iter, prototype);
+ current = prepareExchangeForRoutingSlip(current, nextEndpoint);
if (!sync) {
if (LOG.isTraceEnabled()) {
@@ -499,17 +544,6 @@ public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdA
}
/**
- * Copy the outbound data in 'source' to the inbound data in 'result'.
- */
- private void copyOutToIn(Exchange result, Exchange source) {
- result.setException(source.getException());
- result.setIn(getResultMessage(source));
-
- result.getProperties().clear();
- result.getProperties().putAll(source.getProperties());
- }
-
- /**
* Creates the embedded processor to use when wrapping this routing slip in an error handler.
*/
public AsyncProcessor newRoutingSlipProcessorForErrorHandler() {
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/DynamicRouterNoCacheTest.java
similarity index 61%
copy from core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
copy to core/camel-core/src/test/java/org/apache/camel/processor/DynamicRouterNoCacheTest.java
index f396c42..16f188b 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/DynamicRouterNoCacheTest.java
@@ -17,50 +17,21 @@
package org.apache.camel.processor;
import java.util.List;
-import java.util.concurrent.ExecutorService;
+import java.util.Map;
import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Headers;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.engine.EmptyProducerCache;
-import org.apache.camel.util.ReflectionHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.StringHelper;
import org.junit.Test;
-public class RecipientListNoCacheTest extends ContextTestSupport {
+public class DynamicRouterNoCacheTest extends ContextTestSupport {
@Test
public void testNoCache() throws Exception {
- MockEndpoint x = getMockEndpoint("mock:x");
- MockEndpoint y = getMockEndpoint("mock:y");
- MockEndpoint z = getMockEndpoint("mock:z");
-
- x.expectedBodiesReceived("foo", "bar");
- y.expectedBodiesReceived("foo", "bar");
- z.expectedBodiesReceived("foo", "bar");
-
- sendBody("foo");
- sendBody("bar");
-
- assertMockEndpointsSatisfied();
-
- // make sure its using an empty producer cache as the cache is disabled
- List<Processor> list = context.getRoute("route1").filter("foo");
- RecipientList rl = (RecipientList) list.get(0);
- assertNotNull(rl);
- assertEquals(-1, rl.getCacheSize());
-
- Object pc = ReflectionHelper.getField(rl.getClass().getDeclaredField("producerCache"), rl);
- assertNotNull(pc);
- assertIsInstanceOf(EmptyProducerCache.class, pc);
-
- // and no thread pool is in use as timeout is 0
- pc = ReflectionHelper.getField(rl.getClass().getDeclaredField("aggregateExecutorService"), rl);
- assertNull(pc);
- }
-
- @Test
- public void testNoCacheTwo() throws Exception {
assertEquals(1, context.getEndpointRegistry().size());
sendBody("foo");
@@ -68,7 +39,7 @@ public class RecipientListNoCacheTest extends ContextTestSupport {
// make sure its using an empty producer cache as the cache is disabled
List<Processor> list = context.getRoute("route1").filter("foo");
- RecipientList rl = (RecipientList) list.get(0);
+ DynamicRouter rl = (DynamicRouter) list.get(0);
assertNotNull(rl);
assertEquals(-1, rl.getCacheSize());
@@ -96,12 +67,28 @@ public class RecipientListNoCacheTest extends ContextTestSupport {
template.sendBodyAndHeader("direct:a", body, "recipientListHeader", "mock:x,mock:y,mock:z");
}
+ public String slip(@Headers Map headers) {
+ String header = (String) headers.get("recipientListHeader");
+ if (ObjectHelper.isEmpty(header)) {
+ return null;
+ }
+ if (header.contains(",")) {
+ String next = StringHelper.before(header, ",");
+ String rest = StringHelper.after(header, ",");
+ headers.put("recipientListHeader", rest);
+ return next;
+ } else {
+ // last slip
+ headers.put("recipientListHeader", "");
+ return header;
+ }
+ }
+
@Override
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
- from("direct:a")
- .recipientList(header("recipientListHeader").tokenize(",")).cacheSize(-1).id("foo");
+ from("direct:a").dynamicRouter(method(DynamicRouterNoCacheTest.class, "slip")).cacheSize(-1).id("foo");
}
};
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
index f396c42..a97b2d3 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
@@ -31,51 +31,39 @@ public class RecipientListNoCacheTest extends ContextTestSupport {
@Test
public void testNoCache() throws Exception {
- MockEndpoint x = getMockEndpoint("mock:x");
- MockEndpoint y = getMockEndpoint("mock:y");
- MockEndpoint z = getMockEndpoint("mock:z");
-
- x.expectedBodiesReceived("foo", "bar");
- y.expectedBodiesReceived("foo", "bar");
- z.expectedBodiesReceived("foo", "bar");
+ assertEquals(1, context.getEndpointRegistry().size());
sendBody("foo");
sendBody("bar");
- assertMockEndpointsSatisfied();
-
// make sure its using an empty producer cache as the cache is disabled
List<Processor> list = context.getRoute("route1").filter("foo");
RecipientList rl = (RecipientList) list.get(0);
assertNotNull(rl);
assertEquals(-1, rl.getCacheSize());
- Object pc = ReflectionHelper.getField(rl.getClass().getDeclaredField("producerCache"), rl);
- assertNotNull(pc);
- assertIsInstanceOf(EmptyProducerCache.class, pc);
+ // check no additional endpoints added as cache was disabled
+ assertEquals(1, context.getEndpointRegistry().size());
- // and no thread pool is in use as timeout is 0
- pc = ReflectionHelper.getField(rl.getClass().getDeclaredField("aggregateExecutorService"), rl);
- assertNull(pc);
- }
+ // now send again with mocks which then add endpoints
+ MockEndpoint x = getMockEndpoint("mock:x");
+ MockEndpoint y = getMockEndpoint("mock:y");
+ MockEndpoint z = getMockEndpoint("mock:z");
- @Test
- public void testNoCacheTwo() throws Exception {
- assertEquals(1, context.getEndpointRegistry().size());
+ x.expectedBodiesReceived("foo", "bar");
+ y.expectedBodiesReceived("foo", "bar");
+ z.expectedBodiesReceived("foo", "bar");
sendBody("foo");
sendBody("bar");
- // make sure its using an empty producer cache as the cache is disabled
- List<Processor> list = context.getRoute("route1").filter("foo");
- RecipientList rl = (RecipientList) list.get(0);
- assertNotNull(rl);
- assertEquals(-1, rl.getCacheSize());
+ assertMockEndpointsSatisfied();
- // check no additional endpoints added as cache was disabled
- assertEquals(1, context.getEndpointRegistry().size());
+ assertEquals(4, context.getEndpointRegistry().size());
+ }
- // now send again with mocks which then add endpoints
+ @Test
+ public void testNoThreadPool() throws Exception {
MockEndpoint x = getMockEndpoint("mock:x");
MockEndpoint y = getMockEndpoint("mock:y");
MockEndpoint z = getMockEndpoint("mock:z");
@@ -89,7 +77,19 @@ public class RecipientListNoCacheTest extends ContextTestSupport {
assertMockEndpointsSatisfied();
- assertEquals(4, context.getEndpointRegistry().size());
+ // make sure its using an empty producer cache as the cache is disabled
+ List<Processor> list = context.getRoute("route1").filter("foo");
+ RecipientList rl = (RecipientList) list.get(0);
+ assertNotNull(rl);
+ assertEquals(-1, rl.getCacheSize());
+
+ Object pc = ReflectionHelper.getField(rl.getClass().getDeclaredField("producerCache"), rl);
+ assertNotNull(pc);
+ assertIsInstanceOf(EmptyProducerCache.class, pc);
+
+ // and no thread pool is in use as timeout is 0
+ pc = ReflectionHelper.getField(rl.getClass().getDeclaredField("aggregateExecutorService"), rl);
+ assertNull(pc);
}
protected void sendBody(String body) {
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipNoCacheTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipNoCacheTest.java
index 752916e..e1e92fb 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipNoCacheTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipNoCacheTest.java
@@ -16,7 +16,10 @@
*/
package org.apache.camel.processor;
+import java.util.List;
+
import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.junit.Test;
@@ -25,6 +28,21 @@ public class RoutingSlipNoCacheTest extends ContextTestSupport {
@Test
public void testNoCache() throws Exception {
+ assertEquals(1, context.getEndpointRegistry().size());
+
+ sendBody("foo");
+ sendBody("bar");
+
+ // make sure its using an empty producer cache as the cache is disabled
+ List<Processor> list = context.getRoute("route1").filter("foo");
+ RoutingSlip rl = (RoutingSlip) list.get(0);
+ assertNotNull(rl);
+ assertEquals(-1, rl.getCacheSize());
+
+ // check no additional endpoints added as cache was disabled
+ assertEquals(1, context.getEndpointRegistry().size());
+
+ // now send again with mocks which then add endpoints
MockEndpoint x = getMockEndpoint("mock:x");
MockEndpoint y = getMockEndpoint("mock:y");
MockEndpoint z = getMockEndpoint("mock:z");
@@ -37,6 +55,8 @@ public class RoutingSlipNoCacheTest extends ContextTestSupport {
sendBody("bar");
assertMockEndpointsSatisfied();
+
+ assertEquals(4, context.getEndpointRegistry().size());
}
protected void sendBody(String body) {
@@ -47,7 +67,7 @@ public class RoutingSlipNoCacheTest extends ContextTestSupport {
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
- from("direct:a").routingSlip(header("recipientListHeader").tokenize(",")).cacheSize(0);
+ from("direct:a").routingSlip(header("recipientListHeader").tokenize(",")).cacheSize(-1).id("foo");
}
};
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/WireTapNoCacheTest.java
similarity index 50%
copy from core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
copy to core/camel-core/src/test/java/org/apache/camel/processor/WireTapNoCacheTest.java
index f396c42..44e3dc9 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/WireTapNoCacheTest.java
@@ -17,83 +17,64 @@
package org.apache.camel.processor;
import java.util.List;
-import java.util.concurrent.ExecutorService;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.engine.EmptyProducerCache;
-import org.apache.camel.util.ReflectionHelper;
import org.junit.Test;
-public class RecipientListNoCacheTest extends ContextTestSupport {
+public class WireTapNoCacheTest extends ContextTestSupport {
@Test
public void testNoCache() throws Exception {
- MockEndpoint x = getMockEndpoint("mock:x");
- MockEndpoint y = getMockEndpoint("mock:y");
- MockEndpoint z = getMockEndpoint("mock:z");
-
- x.expectedBodiesReceived("foo", "bar");
- y.expectedBodiesReceived("foo", "bar");
- z.expectedBodiesReceived("foo", "bar");
-
- sendBody("foo");
- sendBody("bar");
-
- assertMockEndpointsSatisfied();
-
- // make sure its using an empty producer cache as the cache is disabled
- List<Processor> list = context.getRoute("route1").filter("foo");
- RecipientList rl = (RecipientList) list.get(0);
- assertNotNull(rl);
- assertEquals(-1, rl.getCacheSize());
-
- Object pc = ReflectionHelper.getField(rl.getClass().getDeclaredField("producerCache"), rl);
- assertNotNull(pc);
- assertIsInstanceOf(EmptyProducerCache.class, pc);
-
- // and no thread pool is in use as timeout is 0
- pc = ReflectionHelper.getField(rl.getClass().getDeclaredField("aggregateExecutorService"), rl);
- assertNull(pc);
- }
-
- @Test
- public void testNoCacheTwo() throws Exception {
assertEquals(1, context.getEndpointRegistry().size());
- sendBody("foo");
- sendBody("bar");
+ sendBody("foo", "mock:x");
+ sendBody("foo", "mock:y");
+ sendBody("foo", "mock:z");
+ sendBody("bar", "mock:x");
+ sendBody("bar", "mock:y");
+ sendBody("bar", "mock:z");
// make sure its using an empty producer cache as the cache is disabled
List<Processor> list = context.getRoute("route1").filter("foo");
- RecipientList rl = (RecipientList) list.get(0);
- assertNotNull(rl);
- assertEquals(-1, rl.getCacheSize());
+ WireTapProcessor wtp = (WireTapProcessor) list.get(0);
+ assertNotNull(wtp);
+ assertEquals(-1, wtp.getCacheSize());
// check no additional endpoints added as cache was disabled
assertEquals(1, context.getEndpointRegistry().size());
// now send again with mocks which then add endpoints
+
MockEndpoint x = getMockEndpoint("mock:x");
MockEndpoint y = getMockEndpoint("mock:y");
MockEndpoint z = getMockEndpoint("mock:z");
- x.expectedBodiesReceived("foo", "bar");
- y.expectedBodiesReceived("foo", "bar");
- z.expectedBodiesReceived("foo", "bar");
+ x.expectedBodiesReceivedInAnyOrder("foo", "bar");
+ y.expectedBodiesReceivedInAnyOrder("foo", "bar");
+ z.expectedBodiesReceivedInAnyOrder("foo", "bar");
+
+ assertEquals(4, context.getEndpointRegistry().size());
+
+ sendBody("foo", "mock:x");
+ sendBody("foo", "mock:y");
+ sendBody("foo", "mock:z");
+ sendBody("bar", "mock:x");
+ sendBody("bar", "mock:y");
+ sendBody("bar", "mock:z");
- sendBody("foo");
- sendBody("bar");
+ // should not register as new endpoint so we keep at 4
+ sendBody("dummy", "mock:dummy");
assertMockEndpointsSatisfied();
assertEquals(4, context.getEndpointRegistry().size());
}
- protected void sendBody(String body) {
- template.sendBodyAndHeader("direct:a", body, "recipientListHeader", "mock:x,mock:y,mock:z");
+ protected void sendBody(String body, String uri) {
+ template.sendBodyAndHeader("direct:a", body, "myHeader", uri);
}
@Override
@@ -101,7 +82,7 @@ public class RecipientListNoCacheTest extends ContextTestSupport {
return new RouteBuilder() {
public void configure() {
from("direct:a")
- .recipientList(header("recipientListHeader").tokenize(",")).cacheSize(-1).id("foo");
+ .wireTap("${header.myHeader}").cacheSize(-1).id("foo").end();
}
};