You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/04/07 07:40:25 UTC
[camel] 02/02: CAMEL-16462: camel-core - Optimize RecipientList EIP
to reduce object allocations.
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 0d9227ff16fb00e047fdd087740c87cce01bb545
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Apr 7 09:39:18 2021 +0200
CAMEL-16462: camel-core - Optimize RecipientList EIP to reduce object allocations.
---
.../apache/camel/processor/MulticastProcessor.java | 9 +++-
.../org/apache/camel/processor/RecipientList.java | 50 ++++++++--------------
.../camel/processor/RecipientListProcessor.java | 13 +++---
.../java/org/apache/camel/processor/Splitter.java | 5 ++-
4 files changed, 35 insertions(+), 42 deletions(-)
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 6f35e5f..c010738 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -320,9 +320,13 @@ public class MulticastProcessor extends AsyncProcessorSupport
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
+ return process(exchange, callback, null);
+ }
+
+ protected boolean process(Exchange exchange, AsyncCallback callback, Iterator iter) {
Iterable<ProcessorExchangePair> pairs;
try {
- pairs = createProcessorExchangePairs(exchange);
+ pairs = createProcessorExchangePairs(exchange, iter);
} catch (Throwable e) {
exchange.setException(e);
// unexpected exception was thrown, maybe from iterator etc. so do not regard as exhausted
@@ -913,7 +917,8 @@ public class MulticastProcessor extends AsyncProcessorSupport
return exchange.getProperty(ExchangePropertyKey.MULTICAST_INDEX, Integer.class);
}
- protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception {
+ protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange, Iterator<?> iter)
+ throws Exception {
List<ProcessorExchangePair> result = new ArrayList<>(processors.size());
StreamCache streamCache = null;
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java
index d8b2321..78ef5bc 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java
@@ -75,6 +75,7 @@ public class RecipientList extends AsyncProcessorSupport implements IdAware, Rou
private boolean shutdownExecutorService;
private volatile ExecutorService aggregateExecutorService;
private AggregationStrategy aggregationStrategy = new UseLatestAggregationStrategy();
+ private RecipientListProcessor recipientListProcessor;
public RecipientList(CamelContext camelContext) {
// use comma by default as delimiter
@@ -198,31 +199,8 @@ public class RecipientList extends AsyncProcessorSupport implements IdAware, Rou
iter = ObjectHelper.createIterator(recipientList, delimiter);
}
- // TODO: Do not create a new processor per exchange
- // TODO: Store iter on exchange property to be used when creating the pairs
- RecipientListProcessor rlp = new RecipientListProcessor(
- exchange.getContext(), null, producerCache, iter, getAggregationStrategy(),
- isParallelProcessing(), getExecutorService(), isShutdownExecutorService(),
- isStreaming(), isStopOnException(), getTimeout(), getOnPrepare(), isShareUnitOfWork(), isParallelAggregate(),
- isStopOnAggregateException());
- rlp.setErrorHandler(errorHandler);
- rlp.setAggregateExecutorService(aggregateExecutorService);
- rlp.setIgnoreInvalidEndpoints(isIgnoreInvalidEndpoints());
- rlp.setCacheSize(getCacheSize());
- rlp.setId(getId());
- rlp.setRouteId(getRouteId());
-
- // start ourselves
- try {
- ServiceHelper.startService(rlp);
- } catch (Exception e) {
- exchange.setException(e);
- callback.done(true);
- return true;
- }
-
// now let the multicast process the exchange
- return rlp.process(exchange, callback);
+ return recipientListProcessor.process(exchange, callback, iter);
}
public EndpointUtilizationStatistics getEndpointUtilizationStatistics() {
@@ -230,15 +208,12 @@ public class RecipientList extends AsyncProcessorSupport implements IdAware, Rou
}
@Override
- protected void doInit() throws Exception {
+ protected void doStart() throws Exception {
if (errorHandler == null) {
// NoErrorHandler is the default base error handler if none has been configured
errorHandler = new NoErrorHandler(null);
}
- }
- @Override
- protected void doStart() throws Exception {
if (producerCache == null) {
if (cacheSize < 0) {
producerCache = new EmptyProducerCache(this, camelContext);
@@ -253,17 +228,30 @@ public class RecipientList extends AsyncProcessorSupport implements IdAware, Rou
aggregateExecutorService
= camelContext.getExecutorServiceManager().newScheduledThreadPool(this, "RecipientList-AggregateTask", 0);
}
- ServiceHelper.startService(aggregationStrategy, producerCache);
+
+ recipientListProcessor = new RecipientListProcessor(
+ camelContext, null, producerCache, getAggregationStrategy(),
+ isParallelProcessing(), getExecutorService(), isShutdownExecutorService(),
+ isStreaming(), isStopOnException(), getTimeout(), getOnPrepare(), isShareUnitOfWork(), isParallelAggregate(),
+ isStopOnAggregateException());
+ recipientListProcessor.setErrorHandler(errorHandler);
+ recipientListProcessor.setAggregateExecutorService(aggregateExecutorService);
+ recipientListProcessor.setIgnoreInvalidEndpoints(isIgnoreInvalidEndpoints());
+ recipientListProcessor.setCacheSize(getCacheSize());
+ recipientListProcessor.setId(getId());
+ recipientListProcessor.setRouteId(getRouteId());
+
+ ServiceHelper.startService(aggregationStrategy, producerCache, recipientListProcessor);
}
@Override
protected void doStop() throws Exception {
- ServiceHelper.stopService(producerCache, aggregationStrategy);
+ ServiceHelper.stopService(producerCache, aggregationStrategy, recipientListProcessor);
}
@Override
protected void doShutdown() throws Exception {
- ServiceHelper.stopAndShutdownServices(producerCache, aggregationStrategy);
+ ServiceHelper.stopAndShutdownServices(producerCache, aggregationStrategy, recipientListProcessor);
if (aggregateExecutorService != null) {
camelContext.getExecutorServiceManager().shutdownNow(aggregateExecutorService);
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
index e1dacf3..f72eede 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
@@ -56,7 +56,6 @@ import org.slf4j.LoggerFactory;
public class RecipientListProcessor extends MulticastProcessor {
private static final Logger LOG = LoggerFactory.getLogger(RecipientListProcessor.class);
- private final Iterator<?> iter;
private boolean ignoreInvalidEndpoints;
private final ProducerCache producerCache;
private int cacheSize;
@@ -149,20 +148,18 @@ public class RecipientListProcessor extends MulticastProcessor {
// TODO: camel-bean @RecipientList cacheSize
- public RecipientListProcessor(CamelContext camelContext, Route route, ProducerCache producerCache, Iterator<?> iter) {
+ public RecipientListProcessor(CamelContext camelContext, Route route, ProducerCache producerCache) {
super(camelContext, route, null);
this.producerCache = producerCache;
- this.iter = iter;
}
- public RecipientListProcessor(CamelContext camelContext, Route route, ProducerCache producerCache, Iterator<?> iter,
+ public RecipientListProcessor(CamelContext camelContext, Route route, ProducerCache producerCache,
AggregationStrategy aggregationStrategy) {
super(camelContext, route, null, aggregationStrategy);
this.producerCache = producerCache;
- this.iter = iter;
}
- public RecipientListProcessor(CamelContext camelContext, Route route, ProducerCache producerCache, Iterator<?> iter,
+ public RecipientListProcessor(CamelContext camelContext, Route route, ProducerCache producerCache,
AggregationStrategy aggregationStrategy,
boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService,
boolean streaming, boolean stopOnException,
@@ -172,7 +169,6 @@ public class RecipientListProcessor extends MulticastProcessor {
streaming, stopOnException, timeout, onPrepare,
shareUnitOfWork, parallelAggregate, stopOnAggregateException);
this.producerCache = producerCache;
- this.iter = iter;
}
public int getCacheSize() {
@@ -192,7 +188,8 @@ public class RecipientListProcessor extends MulticastProcessor {
}
@Override
- protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception {
+ protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange, Iterator<?> iter)
+ throws Exception {
// here we iterate the recipient lists and create the exchange pair for each of those
List<ProcessorExchangePair> result = new ArrayList<>();
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
index 58851cc..217b5bf 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
@@ -159,7 +159,10 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac
}
@Override
- protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception {
+ protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange, Iterator<?> iter)
+ throws Exception {
+ // iter is only currently used by Recipient List EIP so its null
+
Object value = expression.evaluate(exchange, Object.class);
if (exchange.getException() != null) {
// force any exceptions occurred during evaluation to be thrown