You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/04/07 12:36:27 UTC

[camel] branch master updated (d2b0862 -> 75a3463)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from d2b0862  CAMEL-16465 - Camel-AWS: Add useDefaultCredentialProvider option to all the components - Kinesis component
     new 6a1d982  CAMEL-16462: camel-core - Optimize RecipientList EIP to reduce object allocations.
     new e2faf88  CAMEL-16462: camel-core - Optimize RecipientList EIP to reduce object allocations.
     new 1e4b501  CAMEL-16462: camel-core - Optimize RecipientList EIP to reduce object allocations.
     new 75a3463  CAMEL-16462: camel-core - Optimize Multicast EIP to reduce object allocations.

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/camel/processor/MulticastProcessor.java |  40 ++----
 .../org/apache/camel/processor/RecipientList.java  |  43 +-----
 .../camel/processor/RecipientListProcessor.java    | 148 ++++++++++++++-------
 .../java/org/apache/camel/processor/Splitter.java  |   2 +-
 .../apache/camel/support/cache/ServicePool.java    |  29 ++--
 .../java/org/apache/camel/util/ObjectHelper.java   |   3 +
 6 files changed, 138 insertions(+), 127 deletions(-)

[camel] 02/04: CAMEL-16462: camel-core - Optimize RecipientList EIP to reduce object allocations.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit e2faf88ecbf0bc0ee64b03ea8e7cbd90051e7b97
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Apr 7 13:41:47 2021 +0200

    CAMEL-16462: camel-core - Optimize RecipientList EIP to reduce object allocations.
---
 .../src/main/java/org/apache/camel/processor/MulticastProcessor.java  | 4 ++--
 .../main/java/org/apache/camel/processor/RecipientListProcessor.java  | 4 ++--
 .../src/main/java/org/apache/camel/processor/Splitter.java            | 2 +-
 core/camel-util/src/main/java/org/apache/camel/util/ObjectHelper.java | 3 +++
 4 files changed, 8 insertions(+), 5 deletions(-)

diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 96d3502..a7e913b 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -326,7 +326,7 @@ public class MulticastProcessor extends AsyncProcessorSupport
     protected boolean process(Exchange exchange, AsyncCallback callback, Iterator iter, int size) {
         Iterable<ProcessorExchangePair> pairs;
         try {
-            pairs = createProcessorExchangePairs(exchange, iter);
+            pairs = createProcessorExchangePairs(exchange, iter, size);
         } catch (Throwable e) {
             exchange.setException(e);
             // unexpected exception was thrown, maybe from iterator etc. so do not regard as exhausted
@@ -925,7 +925,7 @@ public class MulticastProcessor extends AsyncProcessorSupport
         return exchange.getProperty(ExchangePropertyKey.MULTICAST_INDEX, Integer.class);
     }
 
-    protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange, Iterator<?> iter)
+    protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange, Iterator<?> iter, int size)
             throws Exception {
         List<ProcessorExchangePair> result = new ArrayList<>(processors.size());
 
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
index c5927fc..2e7021d 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
@@ -188,10 +188,10 @@ public class RecipientListProcessor extends MulticastProcessor {
     }
 
     @Override
-    protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange, Iterator<?> iter)
+    protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange, Iterator<?> iter, int size)
             throws Exception {
         // here we iterate the recipient lists and create the exchange pair for each of those
-        List<ProcessorExchangePair> result = new ArrayList<>();
+        List<ProcessorExchangePair> result = size > 0 ? new ArrayList<>(size) : new ArrayList<>();
 
         // at first we must lookup the endpoint and acquire the producer which can send to the endpoint
         int index = 0;
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
index 217b5bf..dd69a2f 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
@@ -159,7 +159,7 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac
     }
 
     @Override
-    protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange, Iterator<?> iter)
+    protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange, Iterator<?> iter, int size)
             throws Exception {
         // iter is only currently used by Recipient List EIP so its null
 
diff --git a/core/camel-util/src/main/java/org/apache/camel/util/ObjectHelper.java b/core/camel-util/src/main/java/org/apache/camel/util/ObjectHelper.java
index 724a9cb..3a0be31 100644
--- a/core/camel-util/src/main/java/org/apache/camel/util/ObjectHelper.java
+++ b/core/camel-util/src/main/java/org/apache/camel/util/ObjectHelper.java
@@ -891,6 +891,9 @@ public final class ObjectHelper {
         return false;
     }
 
+    /**
+     * Used by camel-bean
+     */
     public static int arrayLength(Object[] pojo) {
         return pojo.length;
     }

[camel] 01/04: CAMEL-16462: camel-core - Optimize RecipientList EIP to reduce object allocations.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 6a1d982212725e7e3bb6baa43c579b6d3779f14e
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Apr 7 11:53:32 2021 +0200

    CAMEL-16462: camel-core - Optimize RecipientList EIP to reduce object allocations.
---
 .../camel/processor/RecipientListProcessor.java    |  6 ++++-
 .../apache/camel/support/cache/ServicePool.java    | 29 ++++++++++++++--------
 2 files changed, 23 insertions(+), 12 deletions(-)

diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
index f72eede..c5927fc 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
@@ -246,7 +246,7 @@ public class RecipientListProcessor extends MulticastProcessor {
         }
 
         // set property which endpoint we send to
-        setToEndpoint(copy, producer);
+        setToEndpoint(copy, endpoint);
 
         // rework error handling to support fine grained error handling
         Route route = ExchangeHelper.getRoute(exchange);
@@ -327,6 +327,10 @@ public class RecipientListProcessor extends MulticastProcessor {
         return null;
     }
 
+    protected static void setToEndpoint(Exchange exchange, Endpoint endpoint) {
+        exchange.setProperty(ExchangePropertyKey.TO_ENDPOINT, endpoint.getEndpointUri());
+    }
+
     @Override
     protected void doBuild() throws Exception {
         super.doBuild();
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/cache/ServicePool.java b/core/camel-support/src/main/java/org/apache/camel/support/cache/ServicePool.java
index 28e9ff2..0dc1931 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/cache/ServicePool.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/cache/ServicePool.java
@@ -127,12 +127,17 @@ abstract class ServicePool<S extends Service> extends ServiceSupport implements
     }
 
     private Pool<S> getOrCreatePool(Endpoint endpoint) {
-        boolean singleton = endpoint.isSingletonProducer();
-        if (singleton) {
-            return pool.computeIfAbsent(endpoint, SinglePool::new);
-        } else {
-            return pool.computeIfAbsent(endpoint, MultiplePool::new);
+        // its a pool so we have a lot more hits, so use regular get, and then fallback to computeIfAbsent
+        Pool<S> answer = pool.get(endpoint);
+        if (answer == null) {
+            boolean singleton = endpoint.isSingletonProducer();
+            if (singleton) {
+                answer = pool.computeIfAbsent(endpoint, SinglePool::new);
+            } else {
+                answer = pool.computeIfAbsent(endpoint, MultiplePool::new);
+            }
         }
+        return answer;
     }
 
     /**
@@ -261,12 +266,14 @@ abstract class ServicePool<S extends Service> extends ServiceSupport implements
         }
 
         private void cleanupEvicts() {
-            for (Map.Entry<Endpoint, Pool<S>> entry : singlePoolEvicted.entrySet()) {
-                Endpoint e = entry.getKey();
-                Pool<S> p = entry.getValue();
-                doStop(e);
-                p.stop();
-                singlePoolEvicted.remove(e);
+            if (!singlePoolEvicted.isEmpty()) {
+                for (Map.Entry<Endpoint, Pool<S>> entry : singlePoolEvicted.entrySet()) {
+                    Endpoint e = entry.getKey();
+                    Pool<S> p = entry.getValue();
+                    doStop(e);
+                    p.stop();
+                    singlePoolEvicted.remove(e);
+                }
             }
         }
 

[camel] 04/04: CAMEL-16462: camel-core - Optimize Multicast EIP to reduce object allocations.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 75a34636810791ed2407d2e6a687637e012aad4d
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Apr 7 14:34:09 2021 +0200

    CAMEL-16462: camel-core - Optimize Multicast EIP to reduce object allocations.
---
 .../apache/camel/processor/MulticastProcessor.java | 30 +++++-----------------
 1 file changed, 7 insertions(+), 23 deletions(-)

diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index cd84349..0d2eaba 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -71,7 +71,6 @@ import org.apache.camel.support.PatternHelper;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.IOHelper;
-import org.apache.camel.util.KeyValueHolder;
 import org.apache.camel.util.StopWatch;
 import org.apache.camel.util.concurrent.AsyncCompletionService;
 import org.slf4j.Logger;
@@ -140,19 +139,6 @@ public class MulticastProcessor extends AsyncProcessorSupport
 
     }
 
-    /**
-     * Class that represents prepared fine grained error handlers when processing multicasted/splitted exchanges
-     * <p/>
-     * See the <tt>createProcessorExchangePair</tt> and <tt>createErrorHandler</tt> methods.
-     */
-    static final class ErrorHandlerKey extends KeyValueHolder<Route, Processor> {
-
-        ErrorHandlerKey(Route key, Processor value) {
-            super(key, value);
-        }
-
-    }
-
     private final class Scheduler implements Executor {
 
         @Override
@@ -183,7 +169,7 @@ public class MulticastProcessor extends AsyncProcessorSupport
     private ExecutorService aggregateExecutorService;
     private boolean shutdownAggregateExecutorService;
     private final long timeout;
-    private final ConcurrentMap<ErrorHandlerKey, Processor> errorHandlers = new ConcurrentHashMap<>();
+    private final ConcurrentMap<Processor, Processor> errorHandlers = new ConcurrentHashMap<>();
     private final boolean shareUnitOfWork;
 
     public MulticastProcessor(CamelContext camelContext, Route route, Collection<Processor> processors) {
@@ -1013,32 +999,30 @@ public class MulticastProcessor extends AsyncProcessorSupport
 
     protected Processor wrapInErrorHandler(Route route, Exchange exchange, Processor processor) {
         Processor answer;
+        Processor key = processor;
 
         if (route != this.route && this.route != null) {
             throw new UnsupportedOperationException("Is this really correct ?");
         }
-        boolean tryBlock = exchange.getProperty(ExchangePropertyKey.TRY_ROUTE_BLOCK, boolean.class);
+        Boolean tryBlock = (Boolean) exchange.getProperty(ExchangePropertyKey.TRY_ROUTE_BLOCK);
 
         // do not wrap in error handler if we are inside a try block
-        if (!tryBlock && route != null) {
+        if (route != null && (tryBlock == null || !tryBlock)) {
             // wrap the producer in error handler so we have fine grained error handling on
             // the output side instead of the input side
             // this is needed to support redelivery on that output alone and not doing redelivery
             // for the entire multicast block again which will start from scratch again
 
-            // create key for cache
-            final ErrorHandlerKey key = new ErrorHandlerKey(route, processor);
-
             // lookup cached first to reuse and preserve memory
             answer = errorHandlers.get(key);
             if (answer != null) {
-                LOG.trace("Using existing error handler for: {}", processor);
+                LOG.trace("Using existing error handler for: {}", key);
                 return answer;
             }
 
-            LOG.trace("Creating error handler for: {}", processor);
+            LOG.trace("Creating error handler for: {}", key);
             try {
-                processor = wrapInErrorHandler(route, processor);
+                processor = wrapInErrorHandler(route, key);
 
                 // and wrap in unit of work processor so the copy exchange also can run under UoW
                 answer = createUnitOfWorkProcessor(route, processor, exchange);

[camel] 03/04: CAMEL-16462: camel-core - Optimize RecipientList EIP to reduce object allocations.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 1e4b501bb36ecdc289aef1e0ea7c84027d1fc281
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Apr 7 14:11:04 2021 +0200

    CAMEL-16462: camel-core - Optimize RecipientList EIP to reduce object allocations.
---
 .../apache/camel/processor/MulticastProcessor.java |  10 +-
 .../org/apache/camel/processor/RecipientList.java  |  43 +------
 .../camel/processor/RecipientListProcessor.java    | 142 ++++++++++++++-------
 .../java/org/apache/camel/processor/Splitter.java  |   2 +-
 4 files changed, 105 insertions(+), 92 deletions(-)

diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index a7e913b..cd84349 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -320,13 +320,11 @@ public class MulticastProcessor extends AsyncProcessorSupport
 
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
-        return process(exchange, callback, null, 0);
-    }
-
-    protected boolean process(Exchange exchange, AsyncCallback callback, Iterator iter, int size) {
         Iterable<ProcessorExchangePair> pairs;
+        // TODO: optimize size
+        int size = 0;
         try {
-            pairs = createProcessorExchangePairs(exchange, iter, size);
+            pairs = createProcessorExchangePairs(exchange);
         } catch (Throwable e) {
             exchange.setException(e);
             // unexpected exception was thrown, maybe from iterator etc. so do not regard as exhausted
@@ -925,7 +923,7 @@ public class MulticastProcessor extends AsyncProcessorSupport
         return exchange.getProperty(ExchangePropertyKey.MULTICAST_INDEX, Integer.class);
     }
 
-    protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange, Iterator<?> iter, int size)
+    protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange)
             throws Exception {
         List<ProcessorExchangePair> result = new ArrayList<>(processors.size());
 
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java
index 4a6c551..c8a36de 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java
@@ -16,16 +16,13 @@
  */
 package org.apache.camel.processor;
 
-import java.lang.reflect.Array;
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.AggregationStrategy;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePropertyKey;
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
@@ -36,7 +33,6 @@ import org.apache.camel.spi.IdAware;
 import org.apache.camel.spi.ProducerCache;
 import org.apache.camel.spi.RouteIdAware;
 import org.apache.camel.support.AsyncProcessorSupport;
-import org.apache.camel.support.ObjectHelper;
 import org.apache.camel.support.cache.DefaultProducerCache;
 import org.apache.camel.support.cache.EmptyProducerCache;
 import org.apache.camel.support.service.ServiceHelper;
@@ -54,13 +50,12 @@ public class RecipientList extends AsyncProcessorSupport implements IdAware, Rou
 
     private static final Logger LOG = LoggerFactory.getLogger(RecipientList.class);
 
-    private static final String IGNORE_DELIMITER_MARKER = "false";
     private final CamelContext camelContext;
     private String id;
     private String routeId;
     private Processor errorHandler;
     private ProducerCache producerCache;
-    private Expression expression;
+    private final Expression expression;
     private final String delimiter;
     private boolean parallelProcessing;
     private boolean parallelAggregate;
@@ -88,6 +83,7 @@ public class RecipientList extends AsyncProcessorSupport implements IdAware, Rou
         StringHelper.notEmpty(delimiter, "delimiter");
         this.camelContext = camelContext;
         this.delimiter = delimiter;
+        this.expression = null;
     }
 
     public RecipientList(CamelContext camelContext, Expression expression) {
@@ -177,38 +173,7 @@ public class RecipientList extends AsyncProcessorSupport implements IdAware, Rou
         if (!isStarted()) {
             throw new IllegalStateException("RecipientList has not been started: " + this);
         }
-
-        // use the evaluate expression result if exists
-        Object recipientList = exchange.removeProperty(ExchangePropertyKey.EVALUATE_EXPRESSION_RESULT);
-        if (recipientList == null && expression != null) {
-            // fallback and evaluate the expression
-            recipientList = expression.evaluate(exchange, Object.class);
-        }
-
-        return sendToRecipientList(exchange, recipientList, callback);
-    }
-
-    /**
-     * Sends the given exchange to the recipient list
-     */
-    public boolean sendToRecipientList(Exchange exchange, Object recipientList, AsyncCallback callback) {
-        // optimize to calculate number of recipients if possible
-        int size = 0;
-        if (recipientList instanceof Collection) {
-            size = ((Collection<?>) recipientList).size();
-        } else if (recipientList.getClass().isArray()) {
-            size = Array.getLength(recipientList);
-        }
-        Iterator<?> iter;
-
-        if (delimiter != null && delimiter.equalsIgnoreCase(IGNORE_DELIMITER_MARKER)) {
-            iter = ObjectHelper.createIterator(recipientList, null);
-        } else {
-            iter = ObjectHelper.createIterator(recipientList, delimiter);
-        }
-
-        // now let the multicast process the exchange
-        return recipientListProcessor.process(exchange, callback, iter, size);
+        return recipientListProcessor.process(exchange, callback);
     }
 
     public EndpointUtilizationStatistics getEndpointUtilizationStatistics() {
@@ -238,7 +203,7 @@ public class RecipientList extends AsyncProcessorSupport implements IdAware, Rou
         }
 
         recipientListProcessor = new RecipientListProcessor(
-                camelContext, null, producerCache, getAggregationStrategy(),
+                camelContext, null, expression, delimiter, producerCache, getAggregationStrategy(),
                 isParallelProcessing(), getExecutorService(), isShutdownExecutorService(),
                 isStreaming(), isStopOnException(), getTimeout(), getOnPrepare(), isShareUnitOfWork(), isParallelAggregate(),
                 isStopOnAggregateException());
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
index 2e7021d..720c937 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
@@ -16,7 +16,9 @@
  */
 package org.apache.camel.processor;
 
+import java.lang.reflect.Array;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
@@ -28,6 +30,7 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.ExchangePropertyKey;
+import org.apache.camel.Expression;
 import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.Processor;
@@ -39,6 +42,7 @@ import org.apache.camel.support.AsyncProcessorConverterHelper;
 import org.apache.camel.support.EndpointHelper;
 import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.MessageHelper;
+import org.apache.camel.support.ObjectHelper;
 import org.apache.camel.support.service.ServiceHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,7 +60,12 @@ import org.slf4j.LoggerFactory;
 public class RecipientListProcessor extends MulticastProcessor {
 
     private static final Logger LOG = LoggerFactory.getLogger(RecipientListProcessor.class);
+
+    private static final String IGNORE_DELIMITER_MARKER = "false";
+
     private boolean ignoreInvalidEndpoints;
+    private final Expression expression;
+    private final String delimiter;
     private final ProducerCache producerCache;
     private int cacheSize;
 
@@ -147,19 +156,8 @@ public class RecipientListProcessor extends MulticastProcessor {
     }
 
     // TODO: camel-bean @RecipientList cacheSize
-
-    public RecipientListProcessor(CamelContext camelContext, Route route, ProducerCache producerCache) {
-        super(camelContext, route, null);
-        this.producerCache = producerCache;
-    }
-
-    public RecipientListProcessor(CamelContext camelContext, Route route, ProducerCache producerCache,
-                                  AggregationStrategy aggregationStrategy) {
-        super(camelContext, route, null, aggregationStrategy);
-        this.producerCache = producerCache;
-    }
-
-    public RecipientListProcessor(CamelContext camelContext, Route route, ProducerCache producerCache,
+    public RecipientListProcessor(CamelContext camelContext, Route route, Expression expression, String delimiter,
+                                  ProducerCache producerCache,
                                   AggregationStrategy aggregationStrategy,
                                   boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService,
                                   boolean streaming, boolean stopOnException,
@@ -168,6 +166,8 @@ public class RecipientListProcessor extends MulticastProcessor {
         super(camelContext, route, null, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService,
               streaming, stopOnException, timeout, onPrepare,
               shareUnitOfWork, parallelAggregate, stopOnAggregateException);
+        this.expression = expression;
+        this.delimiter = delimiter;
         this.producerCache = producerCache;
     }
 
@@ -188,47 +188,97 @@ public class RecipientListProcessor extends MulticastProcessor {
     }
 
     @Override
-    protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange, Iterator<?> iter, int size)
+    protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange)
             throws Exception {
-        // here we iterate the recipient lists and create the exchange pair for each of those
-        List<ProcessorExchangePair> result = size > 0 ? new ArrayList<>(size) : new ArrayList<>();
 
-        // at first we must lookup the endpoint and acquire the producer which can send to the endpoint
+        // use the evaluate expression result if exists
+        Object recipientList = exchange.removeProperty(ExchangePropertyKey.EVALUATE_EXPRESSION_RESULT);
+        if (recipientList == null && expression != null) {
+            // fallback and evaluate the expression
+            recipientList = expression.evaluate(exchange, Object.class);
+        }
+
+        // optimize for recipient without need for using delimiter
+        // (if its list/collection/array type)
+        if (recipientList instanceof List) {
+            List col = (List) recipientList;
+            int size = col.size();
+            List<ProcessorExchangePair> result = new ArrayList<>(size);
+            int index = 0;
+            for (int i = 0; i < size; i++) {
+                Object recipient = col.get(i);
+                index = doCreateProcessorExchangePairs(exchange, recipient, result, index);
+            }
+            return result;
+        } else if (recipientList instanceof Collection) {
+            Collection col = (Collection) recipientList;
+            int size = col.size();
+            List<ProcessorExchangePair> result = new ArrayList<>(size);
+            int index = 0;
+            for (Object recipient : col) {
+                index = doCreateProcessorExchangePairs(exchange, recipient, result, index);
+            }
+            return result;
+        } else if (recipientList.getClass().isArray()) {
+            Object[] arr = (Object[]) recipientList;
+            int size = Array.getLength(recipientList);
+            List<ProcessorExchangePair> result = new ArrayList<>(size);
+            int index = 0;
+            for (int i = 0; i < size; i++) {
+                Object recipient = arr[i];
+                index = doCreateProcessorExchangePairs(exchange, recipient, result, index);
+            }
+            return result;
+        }
+
+        // okay we have to use iterator based separated by delimiter
+        Iterator<?> iter;
+        if (delimiter != null && delimiter.equalsIgnoreCase(IGNORE_DELIMITER_MARKER)) {
+            iter = ObjectHelper.createIterator(recipientList, null);
+        } else {
+            iter = ObjectHelper.createIterator(recipientList, delimiter);
+        }
+        List<ProcessorExchangePair> result = new ArrayList<>();
         int index = 0;
         while (iter.hasNext()) {
-            boolean prototype = cacheSize < 0;
+            index = doCreateProcessorExchangePairs(exchange, iter.next(), result, index);
+        }
+        return result;
+    }
 
-            Object recipient = iter.next();
-            Endpoint endpoint;
-            Producer producer;
-            ExchangePattern pattern;
-            try {
-                recipient = prepareRecipient(exchange, recipient);
-                Endpoint existing = getExistingEndpoint(exchange, recipient);
-                if (existing == null) {
-                    endpoint = resolveEndpoint(exchange, recipient, prototype);
-                } else {
-                    endpoint = existing;
-                    // we have an existing endpoint then its not a prototype scope
-                    prototype = false;
-                }
-                pattern = resolveExchangePattern(recipient);
-                producer = producerCache.acquireProducer(endpoint);
-            } catch (Exception e) {
-                if (isIgnoreInvalidEndpoints()) {
-                    LOG.debug("Endpoint uri is invalid: {}. This exception will be ignored.", recipient, e);
-                    continue;
-                } else {
-                    // failure so break out
-                    throw e;
-                }
+    private int doCreateProcessorExchangePairs(
+            Exchange exchange, Object recipient, List<ProcessorExchangePair> result, int index)
+            throws NoTypeConversionAvailableException {
+        boolean prototype = cacheSize < 0;
+
+        Endpoint endpoint;
+        Producer producer;
+        ExchangePattern pattern;
+        try {
+            recipient = prepareRecipient(exchange, recipient);
+            Endpoint existing = getExistingEndpoint(exchange, recipient);
+            if (existing == null) {
+                endpoint = resolveEndpoint(exchange, recipient, prototype);
+            } else {
+                endpoint = existing;
+                // we have an existing endpoint then its not a prototype scope
+                prototype = false;
+            }
+            pattern = resolveExchangePattern(recipient);
+            producer = producerCache.acquireProducer(endpoint);
+        } catch (Exception e) {
+            if (isIgnoreInvalidEndpoints()) {
+                LOG.debug("Endpoint uri is invalid: {}. This exception will be ignored.", recipient, e);
+                return index;
+            } else {
+                // failure so break out
+                throw e;
             }
-
-            // then create the exchange pair
-            result.add(createProcessorExchangePair(index++, endpoint, producer, exchange, pattern, prototype));
         }
 
-        return result;
+        // then create the exchange pair
+        result.add(createProcessorExchangePair(index++, endpoint, producer, exchange, pattern, prototype));
+        return index;
     }
 
     /**
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
index dd69a2f..1d2b444 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
@@ -159,7 +159,7 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac
     }
 
     @Override
-    protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange, Iterator<?> iter, int size)
+    protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange)
             throws Exception {
         // iter is only currently used by Recipient List EIP so its null