You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/02/21 12:04:31 UTC
[camel] 02/11: CAMEL-14596: camel-core - Optimize Recipient List
when cacheSize = -1 to avoid creating endpoints. Introduced
getPrototypeEndpoint API.
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 1859881d17a420384037531a55f6b7623d8c1c16
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Feb 21 04:56:57 2020 +0100
CAMEL-14596: camel-core - Optimize Recipient List when cacheSize = -1 to avoid creating endpoints. Introduced getPrototypeEndpoint API.
---
.../org/apache/camel/processor/RecipientList.java | 1 +
.../camel/processor/RecipientListProcessor.java | 61 +++++++++++++++++++---
.../camel/processor/RecipientListNoCacheTest.java | 33 ++++++++++++
3 files changed, 88 insertions(+), 7 deletions(-)
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java b/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java
index 4903429..f473dfc 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java
@@ -193,6 +193,7 @@ public class RecipientList extends AsyncProcessorSupport implements IdAware, Rou
isStopOnAggregateException());
rlp.setAggregateExecutorService(aggregateExecutorService);
rlp.setIgnoreInvalidEndpoints(isIgnoreInvalidEndpoints());
+ rlp.setCacheSize(getCacheSize());
rlp.setId(getId());
rlp.setRouteId(getRouteId());
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/RecipientListProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
index fb5b7c9..3eb6352 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
@@ -32,6 +32,7 @@ import org.apache.camel.ErrorHandlerFactory;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.ExtendedExchange;
+import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.spi.ProducerCache;
@@ -66,6 +67,7 @@ public class RecipientListProcessor extends MulticastProcessor {
private final Iterator<?> iter;
private boolean ignoreInvalidEndpoints;
private ProducerCache producerCache;
+ private int cacheSize;
/**
* Class that represent each step in the recipient list to do
@@ -82,9 +84,10 @@ public class RecipientListProcessor extends MulticastProcessor {
private final ProducerCache producerCache;
private final ExchangePattern pattern;
private volatile ExchangePattern originalPattern;
+ private final boolean prototypeEndpoint;
private RecipientProcessorExchangePair(int index, ProducerCache producerCache, Endpoint endpoint, Producer producer,
- Processor prepared, Exchange exchange, ExchangePattern pattern) {
+ Processor prepared, Exchange exchange, ExchangePattern pattern, boolean prototypeEndpoint) {
this.index = index;
this.producerCache = producerCache;
this.endpoint = endpoint;
@@ -92,6 +95,7 @@ public class RecipientListProcessor extends MulticastProcessor {
this.prepared = prepared;
this.exchange = exchange;
this.pattern = pattern;
+ this.prototypeEndpoint = prototypeEndpoint;
}
@Override
@@ -139,6 +143,10 @@ public class RecipientListProcessor extends MulticastProcessor {
}
// when we are done we should release back in pool
producerCache.releaseProducer(endpoint, producer);
+ // and stop prototype endpoints
+ if (prototypeEndpoint) {
+ ServiceHelper.stopAndShutdownService(endpoint);
+ }
} catch (Exception e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Error releasing producer: " + producer + ". This exception will be ignored.", e);
@@ -148,6 +156,8 @@ public class RecipientListProcessor extends MulticastProcessor {
}
+ // TODO: camel-bean @RecipientList cacheSize
+
public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<?> iter) {
super(camelContext, null);
this.producerCache = producerCache;
@@ -169,6 +179,14 @@ public class RecipientListProcessor extends MulticastProcessor {
this.iter = iter;
}
+ public int getCacheSize() {
+ return cacheSize;
+ }
+
+ public void setCacheSize(int cacheSize) {
+ this.cacheSize = cacheSize;
+ }
+
public boolean isIgnoreInvalidEndpoints() {
return ignoreInvalidEndpoints;
}
@@ -185,12 +203,21 @@ public class RecipientListProcessor extends MulticastProcessor {
// at first we must lookup the endpoint and acquire the producer which can send to the endpoint
int index = 0;
while (iter.hasNext()) {
+ boolean prototype = cacheSize < 0;
+
Object recipient = iter.next();
Endpoint endpoint;
Producer producer;
ExchangePattern pattern;
try {
- endpoint = resolveEndpoint(exchange, recipient);
+ Endpoint existing = getExistingEndpoint(exchange, recipient);
+ if (existing == null) {
+ endpoint = resolveEndpoint(exchange, recipient, prototype);
+ } else {
+ endpoint = existing;
+ // we have an existing endpoint then its not a prototype scope
+ prototype = false;
+ }
pattern = resolveExchangePattern(recipient);
producer = producerCache.acquireProducer(endpoint);
} catch (Exception e) {
@@ -206,7 +233,7 @@ public class RecipientListProcessor extends MulticastProcessor {
}
// then create the exchange pair
- result.add(createProcessorExchangePair(index++, endpoint, producer, exchange, pattern));
+ result.add(createProcessorExchangePair(index++, endpoint, producer, exchange, pattern, prototype));
}
return result;
@@ -215,7 +242,8 @@ public class RecipientListProcessor extends MulticastProcessor {
/**
* This logic is similar to MulticastProcessor but we have to return a RecipientProcessorExchangePair instead
*/
- protected ProcessorExchangePair createProcessorExchangePair(int index, Endpoint endpoint, Producer producer, Exchange exchange, ExchangePattern pattern) {
+ protected ProcessorExchangePair createProcessorExchangePair(int index, Endpoint endpoint, Producer producer,
+ Exchange exchange, ExchangePattern pattern, boolean prototypeEndpoint) {
Processor prepared = producer;
// copy exchange, and do not share the unit of work
@@ -243,7 +271,7 @@ public class RecipientListProcessor extends MulticastProcessor {
}
// and create the pair
- return new RecipientProcessorExchangePair(index, producerCache, endpoint, producer, prepared, copy, pattern);
+ return new RecipientProcessorExchangePair(index, producerCache, endpoint, producer, prepared, copy, pattern, prototypeEndpoint);
}
@Override
@@ -263,12 +291,31 @@ public class RecipientListProcessor extends MulticastProcessor {
return answer;
}
- protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient) {
+ protected static Endpoint getExistingEndpoint(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException {
+ // trim strings as end users might have added spaces between separators
+ if (recipient instanceof Endpoint) {
+ return (Endpoint) recipient;
+ } else if (recipient instanceof String) {
+ recipient = ((String) recipient).trim();
+ }
+ if (recipient != null) {
+ // convert to a string type we can work with
+ String uri = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient);
+ return exchange.getContext().hasEndpoint(uri);
+ }
+ return null;
+ }
+
+ protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient, boolean prototype) {
// trim strings as end users might have added spaces between separators
if (recipient instanceof String) {
recipient = ((String) recipient).trim();
}
- return ExchangeHelper.resolveEndpoint(exchange, recipient);
+ if (recipient != null) {
+ return prototype ? ExchangeHelper.resolvePrototypeEndpoint(exchange, recipient) : ExchangeHelper.resolveEndpoint(exchange, recipient);
+ } else {
+ return null;
+ }
}
protected ExchangePattern resolveExchangePattern(Object recipient) throws UnsupportedEncodingException, URISyntaxException, MalformedURLException {
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
index f4a9575..f396c42 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
@@ -59,6 +59,39 @@ public class RecipientListNoCacheTest extends ContextTestSupport {
assertNull(pc);
}
+ @Test
+ public void testNoCacheTwo() throws Exception {
+ assertEquals(1, context.getEndpointRegistry().size());
+
+ sendBody("foo");
+ sendBody("bar");
+
+ // make sure its using an empty producer cache as the cache is disabled
+ List<Processor> list = context.getRoute("route1").filter("foo");
+ RecipientList rl = (RecipientList) list.get(0);
+ assertNotNull(rl);
+ assertEquals(-1, rl.getCacheSize());
+
+ // check no additional endpoints added as cache was disabled
+ assertEquals(1, context.getEndpointRegistry().size());
+
+ // now send again with mocks which then add endpoints
+ MockEndpoint x = getMockEndpoint("mock:x");
+ MockEndpoint y = getMockEndpoint("mock:y");
+ MockEndpoint z = getMockEndpoint("mock:z");
+
+ x.expectedBodiesReceived("foo", "bar");
+ y.expectedBodiesReceived("foo", "bar");
+ z.expectedBodiesReceived("foo", "bar");
+
+ sendBody("foo");
+ sendBody("bar");
+
+ assertMockEndpointsSatisfied();
+
+ assertEquals(4, context.getEndpointRegistry().size());
+ }
+
protected void sendBody(String body) {
template.sendBodyAndHeader("direct:a", body, "recipientListHeader", "mock:x,mock:y,mock:z");
}