You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/04/07 07:40:25 UTC

[camel] 02/02: CAMEL-16462: camel-core - Optimize RecipientList EIP to reduce object allocations.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 0d9227ff16fb00e047fdd087740c87cce01bb545
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Apr 7 09:39:18 2021 +0200

    CAMEL-16462: camel-core - Optimize RecipientList EIP to reduce object allocations.
---
 .../apache/camel/processor/MulticastProcessor.java |  9 +++-
 .../org/apache/camel/processor/RecipientList.java  | 50 ++++++++--------------
 .../camel/processor/RecipientListProcessor.java    | 13 +++---
 .../java/org/apache/camel/processor/Splitter.java  |  5 ++-
 4 files changed, 35 insertions(+), 42 deletions(-)

diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 6f35e5f..c010738 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -320,9 +320,13 @@ public class MulticastProcessor extends AsyncProcessorSupport
 
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
+        return process(exchange, callback, null);
+    }
+
+    protected boolean process(Exchange exchange, AsyncCallback callback, Iterator iter) {
         Iterable<ProcessorExchangePair> pairs;
         try {
-            pairs = createProcessorExchangePairs(exchange);
+            pairs = createProcessorExchangePairs(exchange, iter);
         } catch (Throwable e) {
             exchange.setException(e);
             // unexpected exception was thrown, maybe from iterator etc. so do not regard as exhausted
@@ -913,7 +917,8 @@ public class MulticastProcessor extends AsyncProcessorSupport
         return exchange.getProperty(ExchangePropertyKey.MULTICAST_INDEX, Integer.class);
     }
 
-    protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception {
+    protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange, Iterator<?> iter)
+            throws Exception {
         List<ProcessorExchangePair> result = new ArrayList<>(processors.size());
 
         StreamCache streamCache = null;
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java
index d8b2321..78ef5bc 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java
@@ -75,6 +75,7 @@ public class RecipientList extends AsyncProcessorSupport implements IdAware, Rou
     private boolean shutdownExecutorService;
     private volatile ExecutorService aggregateExecutorService;
     private AggregationStrategy aggregationStrategy = new UseLatestAggregationStrategy();
+    private RecipientListProcessor recipientListProcessor;
 
     public RecipientList(CamelContext camelContext) {
         // use comma by default as delimiter
@@ -198,31 +199,8 @@ public class RecipientList extends AsyncProcessorSupport implements IdAware, Rou
             iter = ObjectHelper.createIterator(recipientList, delimiter);
         }
 
-        // TODO: Do not create a new processor per exchange
-        // TODO: Store iter on exchange property to be used when creating the pairs
-        RecipientListProcessor rlp = new RecipientListProcessor(
-                exchange.getContext(), null, producerCache, iter, getAggregationStrategy(),
-                isParallelProcessing(), getExecutorService(), isShutdownExecutorService(),
-                isStreaming(), isStopOnException(), getTimeout(), getOnPrepare(), isShareUnitOfWork(), isParallelAggregate(),
-                isStopOnAggregateException());
-        rlp.setErrorHandler(errorHandler);
-        rlp.setAggregateExecutorService(aggregateExecutorService);
-        rlp.setIgnoreInvalidEndpoints(isIgnoreInvalidEndpoints());
-        rlp.setCacheSize(getCacheSize());
-        rlp.setId(getId());
-        rlp.setRouteId(getRouteId());
-
-        // start ourselves
-        try {
-            ServiceHelper.startService(rlp);
-        } catch (Exception e) {
-            exchange.setException(e);
-            callback.done(true);
-            return true;
-        }
-
         // now let the multicast process the exchange
-        return rlp.process(exchange, callback);
+        return recipientListProcessor.process(exchange, callback, iter);
     }
 
     public EndpointUtilizationStatistics getEndpointUtilizationStatistics() {
@@ -230,15 +208,12 @@ public class RecipientList extends AsyncProcessorSupport implements IdAware, Rou
     }
 
     @Override
-    protected void doInit() throws Exception {
+    protected void doStart() throws Exception {
         if (errorHandler == null) {
             // NoErrorHandler is the default base error handler if none has been configured
             errorHandler = new NoErrorHandler(null);
         }
-    }
 
-    @Override
-    protected void doStart() throws Exception {
         if (producerCache == null) {
             if (cacheSize < 0) {
                 producerCache = new EmptyProducerCache(this, camelContext);
@@ -253,17 +228,30 @@ public class RecipientList extends AsyncProcessorSupport implements IdAware, Rou
             aggregateExecutorService
                     = camelContext.getExecutorServiceManager().newScheduledThreadPool(this, "RecipientList-AggregateTask", 0);
         }
-        ServiceHelper.startService(aggregationStrategy, producerCache);
+
+        recipientListProcessor = new RecipientListProcessor(
+                camelContext, null, producerCache, getAggregationStrategy(),
+                isParallelProcessing(), getExecutorService(), isShutdownExecutorService(),
+                isStreaming(), isStopOnException(), getTimeout(), getOnPrepare(), isShareUnitOfWork(), isParallelAggregate(),
+                isStopOnAggregateException());
+        recipientListProcessor.setErrorHandler(errorHandler);
+        recipientListProcessor.setAggregateExecutorService(aggregateExecutorService);
+        recipientListProcessor.setIgnoreInvalidEndpoints(isIgnoreInvalidEndpoints());
+        recipientListProcessor.setCacheSize(getCacheSize());
+        recipientListProcessor.setId(getId());
+        recipientListProcessor.setRouteId(getRouteId());
+
+        ServiceHelper.startService(aggregationStrategy, producerCache, recipientListProcessor);
     }
 
     @Override
     protected void doStop() throws Exception {
-        ServiceHelper.stopService(producerCache, aggregationStrategy);
+        ServiceHelper.stopService(producerCache, aggregationStrategy, recipientListProcessor);
     }
 
     @Override
     protected void doShutdown() throws Exception {
-        ServiceHelper.stopAndShutdownServices(producerCache, aggregationStrategy);
+        ServiceHelper.stopAndShutdownServices(producerCache, aggregationStrategy, recipientListProcessor);
 
         if (aggregateExecutorService != null) {
             camelContext.getExecutorServiceManager().shutdownNow(aggregateExecutorService);
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
index e1dacf3..f72eede 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
@@ -56,7 +56,6 @@ import org.slf4j.LoggerFactory;
 public class RecipientListProcessor extends MulticastProcessor {
 
     private static final Logger LOG = LoggerFactory.getLogger(RecipientListProcessor.class);
-    private final Iterator<?> iter;
     private boolean ignoreInvalidEndpoints;
     private final ProducerCache producerCache;
     private int cacheSize;
@@ -149,20 +148,18 @@ public class RecipientListProcessor extends MulticastProcessor {
 
     // TODO: camel-bean @RecipientList cacheSize
 
-    public RecipientListProcessor(CamelContext camelContext, Route route, ProducerCache producerCache, Iterator<?> iter) {
+    public RecipientListProcessor(CamelContext camelContext, Route route, ProducerCache producerCache) {
         super(camelContext, route, null);
         this.producerCache = producerCache;
-        this.iter = iter;
     }
 
-    public RecipientListProcessor(CamelContext camelContext, Route route, ProducerCache producerCache, Iterator<?> iter,
+    public RecipientListProcessor(CamelContext camelContext, Route route, ProducerCache producerCache,
                                   AggregationStrategy aggregationStrategy) {
         super(camelContext, route, null, aggregationStrategy);
         this.producerCache = producerCache;
-        this.iter = iter;
     }
 
-    public RecipientListProcessor(CamelContext camelContext, Route route, ProducerCache producerCache, Iterator<?> iter,
+    public RecipientListProcessor(CamelContext camelContext, Route route, ProducerCache producerCache,
                                   AggregationStrategy aggregationStrategy,
                                   boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService,
                                   boolean streaming, boolean stopOnException,
@@ -172,7 +169,6 @@ public class RecipientListProcessor extends MulticastProcessor {
               streaming, stopOnException, timeout, onPrepare,
               shareUnitOfWork, parallelAggregate, stopOnAggregateException);
         this.producerCache = producerCache;
-        this.iter = iter;
     }
 
     public int getCacheSize() {
@@ -192,7 +188,8 @@ public class RecipientListProcessor extends MulticastProcessor {
     }
 
     @Override
-    protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception {
+    protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange, Iterator<?> iter)
+            throws Exception {
         // here we iterate the recipient lists and create the exchange pair for each of those
         List<ProcessorExchangePair> result = new ArrayList<>();
 
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
index 58851cc..217b5bf 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
@@ -159,7 +159,10 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac
     }
 
     @Override
-    protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception {
+    protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange, Iterator<?> iter)
+            throws Exception {
+        // iter is only currently used by Recipient List EIP so its null
+
         Object value = expression.evaluate(exchange, Object.class);
         if (exchange.getException() != null) {
             // force any exceptions occurred during evaluation to be thrown