You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/02/21 12:04:29 UTC

[camel] branch master updated (86506c1 -> b16a392)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from 86506c1  Camel-AWS2-Translate: Correct the way we return errorCode
     new 0e5dde0  CAMEL-14596: camel-core - Optimize toD when cacheSize = -1 to avoid creating endpoints. Introduced getPrototypeEndpoint API.
     new 1859881  CAMEL-14596: camel-core - Optimize Recipient List when cacheSize = -1 to avoid creating endpoints. Introduced getPrototypeEndpoint API.
     new 9990d58  CAMEL-14596: camel-core - Optimize RoutingSlip/DynamicRouter when cacheSize = -1 to avoid creating endpoints. Introduced getPrototypeEndpoint API.
     new 94910c6  CAMEL-14596: camel-core - Optimize Enrich/PollEnrich when cacheSize = -1 to avoid creating endpoints. Introduced getPrototypeEndpoint API.
     new 8cea4e7  CAMEL-14596: Update cache size documentation
     new 3698419  CAMEL-14596: Move new API to ExtendedCamelContext as its not for end users
     new 5d09651  CAMEL-14596: Update upgrade guide
     new 3c031d8  CAMEL-14596: Move new API to ExtendedCamelContext as its not for end users
     new fae9c49  CAMEL-14596: Update cache size documentation
     new 9efd2fc  CAMEL-14596: Regen
     new b16a392  CAMEL-14596: Fixed test

The 11 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../main/java/org/apache/camel/CamelContext.java   |  13 +++
 .../org/apache/camel/ExtendedCamelContext.java     |  16 +++
 .../camel/impl/engine/AbstractCamelContext.java    |  60 ++++++++---
 .../java/org/apache/camel/processor/Enricher.java  |  55 ++++++++--
 .../org/apache/camel/processor/PollEnricher.java   |  46 +++++++-
 .../org/apache/camel/processor/RecipientList.java  |   1 +
 .../camel/processor/RecipientListProcessor.java    |  61 +++++++++--
 .../org/apache/camel/processor/RoutingSlip.java    |  82 +++++++++-----
 .../camel/processor/SendDynamicProcessor.java      |  35 +++++-
 .../org/apache/camel/model/dynamicRouter.json      |   2 +-
 .../resources/org/apache/camel/model/enrich.json   |   2 +-
 .../org/apache/camel/model/pollEnrich.json         |   2 +-
 .../org/apache/camel/model/recipientList.json      |   2 +-
 .../org/apache/camel/model/routingSlip.json        |   2 +-
 .../src/main/docs/eips/dynamic-router.adoc         |  36 +++----
 .../src/main/docs/eips/dynamicRouter-eip.adoc      |   4 +-
 .../src/main/docs/eips/enrich-eip.adoc             |   4 +-
 .../src/main/docs/eips/pollEnrich-eip.adoc         |   4 +-
 .../src/main/docs/eips/recipientList-eip.adoc      |   4 +-
 .../src/main/docs/eips/routingSlip-eip.adoc        |   3 +-
 .../src/main/docs/eips/wireTap-eip.adoc            |   4 +-
 .../camel/model/DynamicRouterDefinition.java       |  40 +++++++
 .../org/apache/camel/model/EnrichDefinition.java   |  40 +++++++
 .../apache/camel/model/PollEnrichDefinition.java   |  40 +++++++
 .../camel/model/RecipientListDefinition.java       |  40 +++++++
 .../apache/camel/model/RoutingSlipDefinition.java  |  26 +++++
 .../org/apache/camel/model/WireTapDefinition.java  |  26 +++++
 .../org/apache/camel/reifier/EnrichReifier.java    |   3 +
 .../apache/camel/impl/EndpointPrototypeTest.java   | 119 +++++++++++++++++++++
 ...acheTest.java => DynamicRouterNoCacheTest.java} |  58 ++++++----
 ...ListNoCacheTest.java => EnrichNoCacheTest.java} |  56 ++++++----
 ...NoCacheTest.java => PollEnrichNoCacheTest.java} |  64 ++++++-----
 .../camel/processor/RecipientListNoCacheTest.java  |  33 ++++++
 .../camel/processor/RoutingSlipNoCacheTest.java    |  22 +++-
 ...tNoCacheTest.java => ToDynamicNoCacheTest.java} |  56 ++++++----
 .../apache/camel/processor/WireTapNoCacheTest.java |  91 ++++++++++++++++
 .../apache/camel/support/CamelContextHelper.java   |  15 +++
 .../org/apache/camel/support/ExchangeHelper.java   |  27 +++++
 .../modules/ROOT/pages/camel-3x-upgrade-guide.adoc |  14 ++-
 .../modules/ROOT/pages/dynamic-router.adoc         |  36 +++----
 .../modules/ROOT/pages/dynamicRouter-eip.adoc      |   4 +-
 .../user-manual/modules/ROOT/pages/enrich-eip.adoc |   4 +-
 .../modules/ROOT/pages/pollEnrich-eip.adoc         |   4 +-
 .../modules/ROOT/pages/recipientList-eip.adoc      |   4 +-
 .../modules/ROOT/pages/routingSlip-eip.adoc        |   3 +-
 .../modules/ROOT/pages/wireTap-eip.adoc            |   4 +-
 46 files changed, 1056 insertions(+), 211 deletions(-)
 create mode 100644 core/camel-core/src/test/java/org/apache/camel/impl/EndpointPrototypeTest.java
 copy core/camel-core/src/test/java/org/apache/camel/processor/{RecipientListNoCacheTest.java => DynamicRouterNoCacheTest.java} (61%)
 copy core/camel-core/src/test/java/org/apache/camel/processor/{RecipientListNoCacheTest.java => EnrichNoCacheTest.java} (58%)
 copy core/camel-core/src/test/java/org/apache/camel/processor/{RecipientListNoCacheTest.java => PollEnrichNoCacheTest.java} (50%)
 copy core/camel-core/src/test/java/org/apache/camel/processor/{RecipientListNoCacheTest.java => ToDynamicNoCacheTest.java} (58%)
 create mode 100644 core/camel-core/src/test/java/org/apache/camel/processor/WireTapNoCacheTest.java


[camel] 06/11: CAMEL-14596: Move new API to ExtendedCamelContext as its not for end users

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 3698419ae94f314c16b250ea6ae2d0403115191d
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Feb 21 10:55:48 2020 +0100

    CAMEL-14596: Move new API to ExtendedCamelContext as its not for end users
---
 .../src/main/java/org/apache/camel/CamelContext.java     | 15 ---------------
 .../main/java/org/apache/camel/ExtendedCamelContext.java | 16 ++++++++++++++++
 .../org/apache/camel/support/CamelContextHelper.java     |  3 ++-
 3 files changed, 18 insertions(+), 16 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/CamelContext.java b/core/camel-api/src/main/java/org/apache/camel/CamelContext.java
index e834261..62e5c50 100644
--- a/core/camel-api/src/main/java/org/apache/camel/CamelContext.java
+++ b/core/camel-api/src/main/java/org/apache/camel/CamelContext.java
@@ -440,21 +440,6 @@ public interface CamelContext extends StatefulService, RuntimeConfiguration {
     Endpoint getEndpoint(String uri);
 
     /**
-     * Resolves the given name to an {@link Endpoint} of the specified type (scope is prototype).
-     * If the name has a singleton endpoint registered, then the singleton is returned.
-     * Otherwise, a new {@link Endpoint} is created.
-     *
-     * The endpoint is NOT registered in the {@link org.apache.camel.spi.EndpointRegistry} as its prototype scoped,
-     * and therefore expected to be short lived and discarded after use.
-     *
-     * @param uri the URI of the endpoint
-     * @return the endpoint
-     *
-     * @see #getEndpoint(String)
-     */
-    Endpoint getPrototypeEndpoint(String uri);
-
-    /**
      * Resolves the given name to an {@link Endpoint} of the specified type.
      * If the name has a singleton endpoint registered, then the singleton is returned.
      * Otherwise, a new {@link Endpoint} is created and registered in the {@link org.apache.camel.spi.EndpointRegistry}.
diff --git a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
index 59ccf81..970d466 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
@@ -116,6 +116,22 @@ public interface ExtendedCamelContext extends CamelContext {
     void registerEndpointCallback(EndpointStrategy strategy);
 
     /**
+     * Resolves the given name to an {@link Endpoint} of the specified type (scope is prototype).
+     * If the name has a singleton endpoint registered, then the singleton is returned.
+     * Otherwise, a new {@link Endpoint} is created.
+     *
+     * The endpoint is NOT registered in the {@link org.apache.camel.spi.EndpointRegistry} as its prototype scoped,
+     * and therefore expected to be short lived and discarded after use (you must stop and shutdown the
+     * endpoint when no longer in use).
+     *
+     * @param uri the URI of the endpoint
+     * @return the endpoint
+     *
+     * @see #getEndpoint(String)
+     */
+    Endpoint getPrototypeEndpoint(String uri);
+
+    /**
      * Returns the order in which the route inputs was started.
      * <p/>
      * The order may not be according to the startupOrder defined on the route.
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/CamelContextHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/CamelContextHelper.java
index 48a4510..1983d9a 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/CamelContextHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/CamelContextHelper.java
@@ -68,7 +68,8 @@ public final class CamelContextHelper {
      */
     public static Endpoint getMandatoryPrototypeEndpoint(CamelContext camelContext, String uri)
         throws NoSuchEndpointException {
-        Endpoint endpoint = camelContext.getPrototypeEndpoint(uri);
+        ExtendedCamelContext ecc = (ExtendedCamelContext) camelContext;
+        Endpoint endpoint = ecc.getPrototypeEndpoint(uri);
         if (endpoint == null) {
             throw new NoSuchEndpointException(uri);
         } else {


[camel] 09/11: CAMEL-14596: Update cache size documentation

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit fae9c49da7222f254f87c731c0169e479623ba3d
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Feb 21 11:29:50 2020 +0100

    CAMEL-14596: Update cache size documentation
---
 .../src/main/docs/eips/dynamic-router.adoc         | 36 ++++++++++------------
 .../src/main/docs/eips/dynamicRouter-eip.adoc      |  4 ++-
 .../src/main/docs/eips/enrich-eip.adoc             |  4 ++-
 .../src/main/docs/eips/pollEnrich-eip.adoc         |  4 ++-
 .../src/main/docs/eips/recipientList-eip.adoc      |  4 ++-
 .../src/main/docs/eips/routingSlip-eip.adoc        |  3 +-
 .../src/main/docs/eips/wireTap-eip.adoc            |  4 ++-
 7 files changed, 33 insertions(+), 26 deletions(-)

diff --git a/core/camel-core-engine/src/main/docs/eips/dynamic-router.adoc b/core/camel-core-engine/src/main/docs/eips/dynamic-router.adoc
index dd6094f..f99f2e0 100644
--- a/core/camel-core-engine/src/main/docs/eips/dynamic-router.adoc
+++ b/core/camel-core-engine/src/main/docs/eips/dynamic-router.adoc
@@ -18,26 +18,6 @@ You must ensure the expression used for the `dynamicRouter` such as a
 bean, will return `null` to indicate the end. Otherwise the
 `dynamicRouter` will keep repeating endlessly.
 
-[[DynamicRouter-Options]]
-== Options
-
-[width="100%",cols="10%,10%,80%",options="header",]
-|=======================================================================
-|Name |Default Value |Description
-
-|`uriDelimiter` |`,` |Delimiter used if the Expression returned multiple
-endpoints.
-
-|`ignoreInvalidEndpoints` |`false` |If an endpoint uri could not be resolved, should it be ignored.
-Otherwise Camel will thrown an exception stating the endpoint uri is not
-valid.
-
-|`cacheSize` |`1000` |Allows to configure the cache size for the
-`ProducerCache` which caches producers for reuse in the routing slip.
-Will by default use the default cache size which is 1000. Setting the
-value to -1 allows to turn off the cache all together.
-|=======================================================================
-
 [[DynamicRouter-DynamicRouterinCamel2.5onwards]]
 == Dynamic Router in Camel 2.5 onwards
 
@@ -48,6 +28,22 @@ allows you to know how far we have processed in the slip. (It's a slip
 because the Dynamic Router implementation is
 based on top of Routing Slip).
 
+TIP: See the `cacheSize` option for more details on _how much cache_ to use depending on how many or few unique endpoints are used.
+
+== Options
+
+// eip options: START
+The Dynamic Router EIP supports 3 options which are listed below:
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|===
+| Name | Description | Default | Type
+| *uriDelimiter* | Sets the uri delimiter to use | , | String
+| *ignoreInvalidEndpoints* | Ignore the invalidate endpoint exception when try to create a producer with that endpoint | false | Boolean
+| *cacheSize* | Sets the maximum size used by the org.apache.camel.spi.ProducerCache which is used to cache and reuse producers when using this routing slip, when uris are reused. Beware that when using dynamic endpoints then it affects how well the cache can be utilized. If each dynamic endpoint is unique then its best to turn of caching by setting this to -1, which allows Camel to not cache both the producers and endpoints; they are regarded as prototype scoped and will be stopped and  [...]
+|===
+// eip options: END
+
 [[DynamicRouter-JavaDSL]]
 == Java DSL
 
diff --git a/core/camel-core-engine/src/main/docs/eips/dynamicRouter-eip.adoc b/core/camel-core-engine/src/main/docs/eips/dynamicRouter-eip.adoc
index 9646c58..70010e3 100644
--- a/core/camel-core-engine/src/main/docs/eips/dynamicRouter-eip.adoc
+++ b/core/camel-core-engine/src/main/docs/eips/dynamicRouter-eip.adoc
@@ -15,6 +15,8 @@ _on-the-fly_.
 You must ensure the expression used for the `dynamicRouter` such as a bean, will return `null` to indicate the end. Otherwise the `dynamicRouter` will keep repeating endlessly.
 ===
 
+TIP: See the `cacheSize` option for more details on _how much cache_ to use depending on how many or few unique endpoints are used.
+
 == Options
 
 // eip options: START
@@ -25,7 +27,7 @@ The Dynamic Router EIP supports 3 options which are listed below:
 | Name | Description | Default | Type
 | *uriDelimiter* | Sets the uri delimiter to use | , | String
 | *ignoreInvalidEndpoints* | Ignore the invalidate endpoint exception when try to create a producer with that endpoint | false | Boolean
-| *cacheSize* | Sets the maximum size used by the org.apache.camel.spi.ProducerCache which is used to cache and reuse producers when using this dynamic router, when uris are reused. |  | Integer
+| *cacheSize* | Sets the maximum size used by the org.apache.camel.spi.ProducerCache which is used to cache and reuse producers when using this dynamic router, when uris are reused. Beware that when using dynamic endpoints then it affects how well the cache can be utilized. If each dynamic endpoint is unique then its best to turn of caching by setting this to -1, which allows Camel to not cache both the producers and endpoints; they are regarded as prototype scoped and will be stopped an [...]
 |===
 // eip options: END
 
diff --git a/core/camel-core-engine/src/main/docs/eips/enrich-eip.adoc b/core/camel-core-engine/src/main/docs/eips/enrich-eip.adoc
index 9a82b39..8f40c81 100644
--- a/core/camel-core-engine/src/main/docs/eips/enrich-eip.adoc
+++ b/core/camel-core-engine/src/main/docs/eips/enrich-eip.adoc
@@ -5,6 +5,8 @@ Camel supports the Content Enricher from the EIP patterns using a Message Transl
 
 image::eip/DataEnricher.gif[image]
 
+TIP: See the `cacheSize` option for more details on _how much cache_ to use depending on how many or few unique endpoints are used.
+
 // eip options: START
 The Enrich EIP supports 7 options which are listed below:
 
@@ -16,7 +18,7 @@ The Enrich EIP supports 7 options which are listed below:
 | *strategyMethodAllowNull* | If this option is false then the aggregate method is not used if there was no data to enrich. If this option is true then null values is used as the oldExchange (when no data to enrich), when using POJOs as the AggregationStrategy. | false | Boolean
 | *aggregateOnException* | If this option is false then the aggregate method is not used if there was an exception thrown while trying to retrieve the data to enrich from the resource. Setting this option to true allows end users to control what to do if there was an exception in the aggregate method. For example to suppress the exception or set a custom message body etc. | false | Boolean
 | *shareUnitOfWork* | Shares the org.apache.camel.spi.UnitOfWork with the parent and the resource exchange. Enrich will by default not share unit of work between the parent exchange and the resource exchange. This means the resource exchange has its own individual unit of work. | false | Boolean
-| *cacheSize* | Sets the maximum size used by the org.apache.camel.spi.ProducerCache which is used to cache and reuse producer when uris are reused. |  | Integer
+| *cacheSize* | Sets the maximum size used by the org.apache.camel.spi.ProducerCache which is used to cache and reuse producer when uris are reused. Beware that when using dynamic endpoints then it affects how well the cache can be utilized. If each dynamic endpoint is unique then its best to turn of caching by setting this to -1, which allows Camel to not cache both the producers and endpoints; they are regarded as prototype scoped and will be stopped and discarded after use. This reduc [...]
 | *ignoreInvalidEndpoint* | Ignore the invalidate endpoint exception when try to create a producer with that endpoint | false | Boolean
 |===
 // eip options: END
diff --git a/core/camel-core-engine/src/main/docs/eips/pollEnrich-eip.adoc b/core/camel-core-engine/src/main/docs/eips/pollEnrich-eip.adoc
index 2aa8b21..87d74c0 100644
--- a/core/camel-core-engine/src/main/docs/eips/pollEnrich-eip.adoc
+++ b/core/camel-core-engine/src/main/docs/eips/pollEnrich-eip.adoc
@@ -22,6 +22,8 @@ The timeout values is in millis.
 `pollEnrich` does *not* access any data from the current Exchange which means when polling it cannot use any of the existing headers you may have set on the Exchange. For example you cannot set a filename in the `Exchange.FILE_NAME` header and use `pollEnrich` to consume only that file. For that you *must* set the filename in the endpoint URI.
 Both `enrich` and `pollEnrich` supports dynamic endpoints that uses an Expression to compute the uri, which allows to use data from the current Exchange. In other words all what is told above no longer apply and it just works.
 
+TIP: See the `cacheSize` option for more details on _how much cache_ to use depending on how many or few unique endpoints are used.
+
 == Content enrichment using pollEnrich
 The `pollEnrich` works just as the `enrich` however as it uses a Polling Consumer we have 3 methods when polling
 
@@ -42,7 +44,7 @@ The Poll Enrich EIP supports 7 options which are listed below:
 | *strategyMethodName* | This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy. |  | String
 | *strategyMethodAllowNull* | If this option is false then the aggregate method is not used if there was no data to enrich. If this option is true then null values is used as the oldExchange (when no data to enrich), when using POJOs as the AggregationStrategy. | false | Boolean
 | *aggregateOnException* | If this option is false then the aggregate method is not used if there was an exception thrown while trying to retrieve the data to enrich from the resource. Setting this option to true allows end users to control what to do if there was an exception in the aggregate method. For example to suppress the exception or set a custom message body etc. | false | Boolean
-| *cacheSize* | Sets the maximum size used by the org.apache.camel.spi.ConsumerCache which is used to cache and reuse consumers when uris are reused. |  | Integer
+| *cacheSize* | Sets the maximum size used by the org.apache.camel.spi.ConsumerCache which is used to cache and reuse consumers when uris are reused. Beware that when using dynamic endpoints then it affects how well the cache can be utilized. If each dynamic endpoint is unique then its best to turn of caching by setting this to -1, which allows Camel to not cache both the producers and endpoints; they are regarded as prototype scoped and will be stopped and discarded after use. This redu [...]
 | *ignoreInvalidEndpoint* | Ignore the invalidate endpoint exception when try to create a producer with that endpoint | false | Boolean
 |===
 // eip options: END
diff --git a/core/camel-core-engine/src/main/docs/eips/recipientList-eip.adoc b/core/camel-core-engine/src/main/docs/eips/recipientList-eip.adoc
index ee1c14c..0c91120 100644
--- a/core/camel-core-engine/src/main/docs/eips/recipientList-eip.adoc
+++ b/core/camel-core-engine/src/main/docs/eips/recipientList-eip.adoc
@@ -6,6 +6,8 @@ image::eip/RecipientList.gif[image]
 
 The recipients will receive a copy of the *same* Exchange, and Camel will execute them sequentially.
 
+TIP: See the `cacheSize` option for more details on _how much cache_ to use depending on how many or few unique endpoints are used.
+
 == Options
 
 // eip options: START
@@ -26,7 +28,7 @@ The Recipient List EIP supports 15 options which are listed below:
 | *timeout* | Sets a total timeout specified in millis, when using parallel processing. If the Recipient List hasn't been able to send and process all replies within the given timeframe, then the timeout triggers and the Recipient List breaks out and continues. Notice if you provide a TimeoutAwareAggregationStrategy then the timeout method is invoked before breaking out. If the timeout is reached with running tasks still remaining, certain tasks for which it is difficult for Camel to shu [...]
 | *onPrepareRef* | Uses the Processor when preparing the org.apache.camel.Exchange to be send. This can be used to deep-clone messages that should be send, or any custom logic needed before the exchange is send. |  | String
 | *shareUnitOfWork* | Shares the org.apache.camel.spi.UnitOfWork with the parent and each of the sub messages. Recipient List will by default not share unit of work between the parent exchange and each recipient exchange. This means each sub exchange has its own individual unit of work. | false | Boolean
-| *cacheSize* | Sets the maximum size used by the org.apache.camel.spi.ProducerCache which is used to cache and reuse producers when using this recipient list, when uris are reused. |  | Integer
+| *cacheSize* | Sets the maximum size used by the org.apache.camel.spi.ProducerCache which is used to cache and reuse producers when using this recipient list, when uris are reused. Beware that when using dynamic endpoints then it affects how well the cache can be utilized. If each dynamic endpoint is unique then its best to turn of caching by setting this to -1, which allows Camel to not cache both the producers and endpoints; they are regarded as prototype scoped and will be stopped an [...]
 | *parallelAggregate* | If enabled then the aggregate method on AggregationStrategy can be called concurrently. Notice that this would require the implementation of AggregationStrategy to be implemented as thread-safe. By default this is false meaning that Camel synchronizes the call to the aggregate method. Though in some use-cases this can be used to archive higher performance when the AggregationStrategy is implemented as thread-safe. | false | Boolean
 | *stopOnAggregateException* | If enabled, unwind exceptions occurring at aggregation time to the error handler when parallelProcessing is used. Currently, aggregation time exceptions do not stop the route processing when parallelProcessing is used. Enabling this option allows to work around this behavior. The default value is false for the sake of backward compatibility. | false | Boolean
 |===
diff --git a/core/camel-core-engine/src/main/docs/eips/routingSlip-eip.adoc b/core/camel-core-engine/src/main/docs/eips/routingSlip-eip.adoc
index 87fdc39..808380b 100644
--- a/core/camel-core-engine/src/main/docs/eips/routingSlip-eip.adoc
+++ b/core/camel-core-engine/src/main/docs/eips/routingSlip-eip.adoc
@@ -4,6 +4,7 @@ The Routing Slip from the https://camel.apache.org/enterprise-integration-patter
 
 image::eip/RoutingTableSimple.gif[image]
 
+TIP: See the `cacheSize` option for more details on _how much cache_ to use depending on how many or few unique endpoints are used.
 
 == Options
 
@@ -15,7 +16,7 @@ The Routing Slip EIP supports 3 options which are listed below:
 | Name | Description | Default | Type
 | *uriDelimiter* | Sets the uri delimiter to use | , | String
 | *ignoreInvalidEndpoints* | Ignore the invalidate endpoint exception when try to create a producer with that endpoint | false | Boolean
-| *cacheSize* | Sets the maximum size used by the org.apache.camel.spi.ProducerCache which is used to cache and reuse producers when using this routing slip, when uris are reused. |  | Integer
+| *cacheSize* | Sets the maximum size used by the org.apache.camel.spi.ProducerCache which is used to cache and reuse producers when using this routing slip, when uris are reused. Beware that when using dynamic endpoints then it affects how well the cache can be utilized. If each dynamic endpoint is unique then its best to turn of caching by setting this to -1, which allows Camel to not cache both the producers and endpoints; they are regarded as prototype scoped and will be stopped and  [...]
 |===
 // eip options: END
 
diff --git a/core/camel-core-engine/src/main/docs/eips/wireTap-eip.adoc b/core/camel-core-engine/src/main/docs/eips/wireTap-eip.adoc
index d1fa452..7082de2 100644
--- a/core/camel-core-engine/src/main/docs/eips/wireTap-eip.adoc
+++ b/core/camel-core-engine/src/main/docs/eips/wireTap-eip.adoc
@@ -15,6 +15,8 @@ should consider enabling xref:stream-caching.adoc[Stream caching] to
 ensure the message body can be read at each endpoint. See more details
 at xref:stream-caching.adoc[Stream caching].
 
+TIP: See the `cacheSize` option for more details on _how much cache_ to use depending on how many or few unique endpoints are used.
+
 == Options
 
 // eip options: START
@@ -31,7 +33,7 @@ The Wire Tap EIP supports 11 options which are listed below:
 | *onPrepareRef* | Uses the Processor when preparing the org.apache.camel.Exchange to be send. This can be used to deep-clone messages that should be send, or any custom logic needed before the exchange is send. |  | String
 | *uri* | *Required* The uri of the endpoint to send to. The uri can be dynamic computed using the org.apache.camel.language.simple.SimpleLanguage expression. |  | String
 | *pattern* | Sets the optional ExchangePattern used to invoke this endpoint |  | ExchangePattern
-| *cacheSize* | Sets the maximum size used by the org.apache.camel.spi.ConsumerCache which is used to cache and reuse producers. |  | Integer
+| *cacheSize* | Sets the maximum size used by the org.apache.camel.spi.ProducerCache which is used to cache and reuse producers when using this dynamic router, when uris are reused. Beware that when using dynamic endpoints then it affects how well the cache can be utilized. If each dynamic endpoint is unique then its best to turn of caching by setting this to -1, which allows Camel to not cache both the producers and endpoints; they are regarded as prototype scoped and will be stopped an [...]
 | *ignoreInvalidEndpoint* | Ignore the invalidate endpoint exception when try to create a producer with that endpoint | false | Boolean
 | *allowOptimisedComponents* | Whether to allow components to optimise toD if they are org.apache.camel.spi.SendDynamicAware. | true | Boolean
 |===


[camel] 01/11: CAMEL-14596: camel-core - Optimize toD when cacheSize = -1 to avoid creating endpoints. Introduced getPrototypeEndpoint API.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 0e5dde03039a6c85298acbd13330072875949a75
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Feb 20 16:06:03 2020 +0100

    CAMEL-14596: camel-core - Optimize toD when cacheSize = -1 to avoid creating endpoints. Introduced getPrototypeEndpoint API.
---
 .../main/java/org/apache/camel/CamelContext.java   |  28 +++++
 .../camel/impl/engine/AbstractCamelContext.java    |  60 ++++++++---
 .../camel/processor/SendDynamicProcessor.java      |  35 +++++-
 .../apache/camel/impl/EndpointPrototypeTest.java   | 118 +++++++++++++++++++++
 .../camel/processor/ToDynamicNoCacheTest.java      |  91 ++++++++++++++++
 .../apache/camel/support/CamelContextHelper.java   |  14 +++
 .../org/apache/camel/support/ExchangeHelper.java   |  21 ++++
 7 files changed, 346 insertions(+), 21 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/CamelContext.java b/core/camel-api/src/main/java/org/apache/camel/CamelContext.java
index bb122b3..e834261 100644
--- a/core/camel-api/src/main/java/org/apache/camel/CamelContext.java
+++ b/core/camel-api/src/main/java/org/apache/camel/CamelContext.java
@@ -268,6 +268,17 @@ public interface CamelContext extends StatefulService, RuntimeConfiguration {
     void addService(Object object, boolean stopOnShutdown, boolean forceStart) throws Exception;
 
     /**
+     * Adds a service to this CamelContext (prototype scope).
+     * <p/>
+     * The service will also have {@link CamelContext} injected if its {@link CamelContextAware}.
+     * The service will be started, if its not already started.
+     *
+     * @param object the service
+     * @throws Exception can be thrown when starting the service
+     */
+    void addPrototypeService(Object object) throws Exception;
+
+    /**
      * Removes a service from this CamelContext.
      * <p/>
      * The service is assumed to have been previously added using {@link #addService(Object)} method.
@@ -423,10 +434,27 @@ public interface CamelContext extends StatefulService, RuntimeConfiguration {
      *
      * @param uri the URI of the endpoint
      * @return the endpoint
+     *
+     * @see #getPrototypeEndpoint(String)
      */
     Endpoint getEndpoint(String uri);
 
     /**
+     * Resolves the given name to an {@link Endpoint} of the specified type (scope is prototype).
+     * If the name has a singleton endpoint registered, then the singleton is returned.
+     * Otherwise, a new {@link Endpoint} is created.
+     *
+     * The endpoint is NOT registered in the {@link org.apache.camel.spi.EndpointRegistry} as its prototype scoped,
+     * and therefore expected to be short lived and discarded after use.
+     *
+     * @param uri the URI of the endpoint
+     * @return the endpoint
+     *
+     * @see #getEndpoint(String)
+     */
+    Endpoint getPrototypeEndpoint(String uri);
+
+    /**
      * Resolves the given name to an {@link Endpoint} of the specified type.
      * If the name has a singleton endpoint registered, then the singleton is returned.
      * Otherwise, a new {@link Endpoint} is created and registered in the {@link org.apache.camel.spi.EndpointRegistry}.
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index bfbeec5..7e5592c 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -731,6 +731,15 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Ext
 
     @Override
     public Endpoint getEndpoint(String uri) {
+        return doGetEndpoint(uri, false);
+    }
+
+    @Override
+    public Endpoint getPrototypeEndpoint(String uri) {
+        return doGetEndpoint(uri, true);
+    }
+
+    protected Endpoint doGetEndpoint(String uri, boolean prototype) {
         // ensure CamelContext are initialized before we can get an endpoint
         init();
 
@@ -758,7 +767,8 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Ext
         String scheme = null;
         // use optimized method to get the endpoint uri
         EndpointKey key = getEndpointKeyPreNormalized(uri);
-        answer = endpoints.get(key);
+        // only lookup and reuse existing endpoints if not prototype scoped
+        answer = prototype ? null : endpoints.get(key);
         if (answer == null) {
             try {
                 // Use the URI prefix to find the component.
@@ -820,8 +830,18 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Ext
                 }
 
                 if (answer != null) {
-                    addService(answer);
-                    answer = addEndpointToRegistry(uri, answer);
+                    if (!prototype) {
+                        addService(answer);
+                        // register in registry
+                        answer = addEndpointToRegistry(uri, answer);
+                    } else {
+                        addPrototypeService(answer);
+                        // if there is endpoint strategies, then use the endpoints they return
+                        // as this allows to intercept endpoints etc.
+                        for (EndpointStrategy strategy : endpointStrategies) {
+                            answer = strategy.registerEndpoint(uri, answer);
+                        }
+                    }
                 }
             } catch (Exception e) {
                 throw new ResolveEndpointFailedException(uri, e);
@@ -1387,7 +1407,12 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Ext
 
     @Override
     public void addService(Object object, boolean stopOnShutdown, boolean forceStart) throws Exception {
-        internalAddService(object, stopOnShutdown, forceStart);
+        internalAddService(object, stopOnShutdown, forceStart, true);
+    }
+
+    @Override
+    public void addPrototypeService(Object object) throws Exception {
+        doAddService(object, false, true, false);
     }
 
     protected <T> T doAddService(T object) {
@@ -1395,19 +1420,20 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Ext
     }
 
     protected <T> T doAddService(T object, boolean stopOnShutdown) {
-        return doAddService(object, stopOnShutdown, true);
+        return doAddService(object, stopOnShutdown, true, true);
     }
 
-    protected <T> T doAddService(T object, boolean stopOnShutdown, boolean forceStart) {
+    protected <T> T doAddService(T object, boolean stopOnShutdown, boolean forceStart, boolean useLifecycleStrategies) {
         try {
-            internalAddService(object, stopOnShutdown, forceStart);
+            internalAddService(object, stopOnShutdown, forceStart, useLifecycleStrategies);
         } catch (Exception e) {
             throw RuntimeCamelException.wrapRuntimeCamelException(e);
         }
         return object;
     }
 
-    private void internalAddService(Object object, boolean stopOnShutdown, boolean forceStart) throws Exception {
+    private void internalAddService(Object object, boolean stopOnShutdown,
+                                    boolean forceStart, boolean useLifecycleStrategies) throws Exception {
 
         // inject CamelContext
         if (object instanceof CamelContextAware) {
@@ -1418,13 +1444,15 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Ext
         if (object instanceof Service) {
             Service service = (Service)object;
 
-            for (LifecycleStrategy strategy : lifecycleStrategies) {
-                if (service instanceof Endpoint) {
-                    // use specialized endpoint add
-                    strategy.onEndpointAdd((Endpoint)service);
-                } else {
-                    Route route = setupRoute.get();
-                    strategy.onServiceAdd(this, service, route);
+            if (useLifecycleStrategies) {
+                for (LifecycleStrategy strategy : lifecycleStrategies) {
+                    if (service instanceof Endpoint) {
+                        // use specialized endpoint add
+                        strategy.onEndpointAdd((Endpoint) service);
+                    } else {
+                        Route route = setupRoute.get();
+                        strategy.onServiceAdd(this, service, route);
+                    }
                 }
             }
 
@@ -4098,7 +4126,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Ext
 
     @Override
     public void setStreamCachingStrategy(StreamCachingStrategy streamCachingStrategy) {
-        this.streamCachingStrategy = doAddService(streamCachingStrategy, true, false);
+        this.streamCachingStrategy = doAddService(streamCachingStrategy, true, false, true);
     }
 
     @Override
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java
index dabc2f9..11fa8ef 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java
@@ -113,6 +113,7 @@ public class SendDynamicProcessor extends AsyncProcessorSupport implements IdAwa
         Processor preAwareProcessor = null;
         Processor postAwareProcessor = null;
         String staticUri = null;
+        boolean prototype = cacheSize < 0;
         try {
             recipient = expression.evaluate(exchange, Object.class);
             if (dynamicAware != null) {
@@ -134,10 +135,14 @@ public class SendDynamicProcessor extends AsyncProcessorSupport implements IdAwa
                     }
                 }
             }
-            if (staticUri != null) {
-                endpoint = resolveEndpoint(exchange, staticUri);
+            Object targetRecipient = staticUri != null ? staticUri : recipient;
+            Endpoint existing = getExistingEndpoint(exchange, targetRecipient);
+            if (existing == null) {
+                endpoint = resolveEndpoint(exchange, targetRecipient, prototype);
             } else {
-                endpoint = resolveEndpoint(exchange, recipient);
+                endpoint = existing;
+                // we have an existing endpoint then its not a prototype scope
+                prototype = false;
             }
             if (endpoint == null) {
                 if (LOG.isDebugEnabled()) {
@@ -165,6 +170,7 @@ public class SendDynamicProcessor extends AsyncProcessorSupport implements IdAwa
         final Processor postProcessor = postAwareProcessor;
         // destination exchange pattern overrides pattern
         final ExchangePattern pattern = destinationExchangePattern != null ? destinationExchangePattern : this.pattern;
+        final boolean stopEndpoint = prototype;
         return producerCache.doInAsyncProducer(endpoint, exchange, callback, (p, e, c) -> {
             final Exchange target = configureExchange(e, pattern, endpoint);
             try {
@@ -191,6 +197,10 @@ public class SendDynamicProcessor extends AsyncProcessorSupport implements IdAwa
                     } catch (Throwable e) {
                         target.setException(e);
                     }
+                    // stop endpoint if prototype as it was only used once
+                    if (stopEndpoint) {
+                        ServiceHelper.stopAndShutdownService(endpoint);
+                    }
                     // signal we are done
                     c.done(doneSync);
                 }
@@ -228,7 +238,22 @@ public class SendDynamicProcessor extends AsyncProcessorSupport implements IdAwa
         return ExchangeHelper.resolveScheme(uri);
     }
 
-    protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException {
+    protected static Endpoint getExistingEndpoint(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException {
+        // trim strings as end users might have added spaces between separators
+        if (recipient instanceof Endpoint) {
+            return (Endpoint) recipient;
+        } else if (recipient instanceof String) {
+            recipient = ((String) recipient).trim();
+        }
+        if (recipient != null) {
+            // convert to a string type we can work with
+            String uri = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient);
+            return exchange.getContext().hasEndpoint(uri);
+        }
+        return null;
+    }
+
+    protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient, boolean prototype) throws NoTypeConversionAvailableException {
         // trim strings as end users might have added spaces between separators
         if (recipient instanceof String) {
             recipient = ((String) recipient).trim();
@@ -240,7 +265,7 @@ public class SendDynamicProcessor extends AsyncProcessorSupport implements IdAwa
         }
 
         if (recipient != null) {
-            return ExchangeHelper.resolveEndpoint(exchange, recipient);
+            return prototype ? ExchangeHelper.resolvePrototypeEndpoint(exchange, recipient) : ExchangeHelper.resolveEndpoint(exchange, recipient);
         } else {
             return null;
         }
diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/EndpointPrototypeTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/EndpointPrototypeTest.java
new file mode 100644
index 0000000..c4523f7
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/impl/EndpointPrototypeTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.support.LifecycleStrategySupport;
+import org.junit.Test;
+
+public class EndpointPrototypeTest extends ContextTestSupport {
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    @Test
+    public void testGetPrototype() throws Exception {
+        context.start();
+
+        assertEquals(0, context.getEndpointRegistry().size());
+
+        context.getEndpoint("mock:foo");
+
+        assertEquals(1, context.getEndpointRegistry().size());
+
+        // now get a prototype which should not be added
+
+        Endpoint prototype = context.getPrototypeEndpoint("mock:bar");
+        assertNotNull(prototype);
+
+        // and should be started
+        MockEndpoint bar = assertIsInstanceOf(MockEndpoint.class, prototype);
+        assertTrue(bar.getStatus().isStarted());
+
+        // but registry is still at 1
+        assertEquals(1, context.getEndpointRegistry().size());
+
+        // and now at 2
+        context.getEndpoint("mock:foo2");
+        assertEquals(2, context.getEndpointRegistry().size());
+
+        context.stop();
+
+        // should not be stopped as we need to handle that ourselves due to prototype scoped
+        assertFalse(bar.getStatus().isStopped());
+        bar.stop();
+        assertTrue(bar.getStatus().isStopped());
+    }
+
+    @Test
+    public void testGetPrototypeNoLifecycleStrategy() throws Exception {
+        final List<Endpoint> endpoints = new ArrayList<>();
+
+        LifecycleStrategySupport dummy = new LifecycleStrategySupport() {
+            @Override
+            public void onEndpointAdd(Endpoint endpoint) {
+                endpoints.add(endpoint);
+            }
+        };
+
+        context.addLifecycleStrategy(dummy);
+        context.start();
+
+        assertEquals(0, context.getEndpointRegistry().size());
+
+        context.getEndpoint("mock:foo");
+
+        assertEquals(1, context.getEndpointRegistry().size());
+
+        // now get a prototype which should not be added
+
+        Endpoint prototype = context.getPrototypeEndpoint("mock:bar");
+        assertNotNull(prototype);
+
+        // and should be started
+        MockEndpoint bar = assertIsInstanceOf(MockEndpoint.class, prototype);
+        assertTrue(bar.getStatus().isStarted());
+
+        // but registry is still at 1
+        assertEquals(1, context.getEndpointRegistry().size());
+
+        // and now at 2
+        context.getEndpoint("mock:foo2");
+        assertEquals(2, context.getEndpointRegistry().size());
+
+        context.stop();
+
+        // should not be stopped as we need to handle that ourselves due to prototype scoped
+        assertFalse(bar.getStatus().isStopped());
+        bar.stop();
+        assertTrue(bar.getStatus().isStopped());
+
+        // should only be mock:foo, mock:foo2, and no mock:bar
+        assertEquals(2, endpoints.size());
+        assertEquals("mock://foo", endpoints.get(0).getEndpointUri());
+        assertEquals("mock://foo2", endpoints.get(1).getEndpointUri());
+    }
+
+}
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/ToDynamicNoCacheTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/ToDynamicNoCacheTest.java
new file mode 100644
index 0000000..29f1f88
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/ToDynamicNoCacheTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+public class ToDynamicNoCacheTest extends ContextTestSupport {
+
+    @Test
+    public void testNoCache() throws Exception {
+        assertEquals(1, context.getEndpointRegistry().size());
+
+        sendBody("foo", "mock:x");
+        sendBody("foo", "mock:y");
+        sendBody("foo", "mock:z");
+        sendBody("bar", "mock:x");
+        sendBody("bar", "mock:y");
+        sendBody("bar", "mock:z");
+
+        // make sure its using an empty producer cache as the cache is disabled
+        List<Processor> list = context.getRoute("route1").filter("foo");
+        SendDynamicProcessor sdp = (SendDynamicProcessor) list.get(0);
+        assertNotNull(sdp);
+        assertEquals(-1, sdp.getCacheSize());
+
+        // check no additional endpoints added as cache was disabled
+        assertEquals(1, context.getEndpointRegistry().size());
+
+        // now send again with mocks which then add endpoints
+
+        MockEndpoint x = getMockEndpoint("mock:x");
+        MockEndpoint y = getMockEndpoint("mock:y");
+        MockEndpoint z = getMockEndpoint("mock:z");
+
+        x.expectedBodiesReceived("foo", "bar");
+        y.expectedBodiesReceived("foo", "bar");
+        z.expectedBodiesReceived("foo", "bar");
+
+        assertEquals(4, context.getEndpointRegistry().size());
+
+        sendBody("foo", "mock:x");
+        sendBody("foo", "mock:y");
+        sendBody("foo", "mock:z");
+        sendBody("bar", "mock:x");
+        sendBody("bar", "mock:y");
+        sendBody("bar", "mock:z");
+
+        // should not register as new endpoint so we keep at 4
+        sendBody("dummy", "mock:dummy");
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals(4, context.getEndpointRegistry().size());
+    }
+
+    protected void sendBody(String body, String uri) {
+        template.sendBodyAndHeader("direct:a", body, "myHeader", uri);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:a")
+                        .toD("${header.myHeader}", -1).id("foo");
+            }
+        };
+
+    }
+
+}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/CamelContextHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/CamelContextHelper.java
index c4eab76..48a4510 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/CamelContextHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/CamelContextHelper.java
@@ -63,6 +63,20 @@ public final class CamelContextHelper {
     }
 
     /**
+     * Returns the mandatory endpoint (prototype scope) for the given URI or the
+     * {@link org.apache.camel.NoSuchEndpointException} is thrown
+     */
+    public static Endpoint getMandatoryPrototypeEndpoint(CamelContext camelContext, String uri)
+        throws NoSuchEndpointException {
+        Endpoint endpoint = camelContext.getPrototypeEndpoint(uri);
+        if (endpoint == null) {
+            throw new NoSuchEndpointException(uri);
+        } else {
+            return endpoint;
+        }
+    }
+
+    /**
      * Returns the mandatory endpoint for the given URI and type or the
      * {@link org.apache.camel.NoSuchEndpointException} is thrown
      */
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
index f66b576..bfac356 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
@@ -100,6 +100,27 @@ public final class ExchangeHelper {
     }
 
     /**
+     * Attempts to resolve the endpoint (prototype scope) for the given value
+     *
+     * @param exchange the message exchange being processed
+     * @param value    the value which can be an {@link Endpoint} or an object
+     *                 which provides a String representation of an endpoint via
+     *                 {@link #toString()}
+     * @return the endpoint
+     * @throws NoSuchEndpointException if the endpoint cannot be resolved
+     */
+    public static Endpoint resolvePrototypeEndpoint(Exchange exchange, Object value) throws NoSuchEndpointException {
+        Endpoint endpoint;
+        if (value instanceof Endpoint) {
+            endpoint = (Endpoint) value;
+        } else {
+            String uri = value.toString().trim();
+            endpoint = CamelContextHelper.getMandatoryPrototypeEndpoint(exchange.getContext(), uri);
+        }
+        return endpoint;
+    }
+
+    /**
      * Gets the mandatory property of the exchange of the correct type
      *
      * @param exchange      the exchange


[camel] 11/11: CAMEL-14596: Fixed test

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit b16a3928691c993c3ab324aff3714d627806283b
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Feb 21 12:08:59 2020 +0100

    CAMEL-14596: Fixed test
---
 .../org/apache/camel/processor/WireTapNoCacheTest.java | 18 +++++++++---------
 1 file changed, 9 insertions(+), 9 deletions(-)

diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/WireTapNoCacheTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/WireTapNoCacheTest.java
index 44e3dc9..a54913b 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/WireTapNoCacheTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/WireTapNoCacheTest.java
@@ -48,9 +48,9 @@ public class WireTapNoCacheTest extends ContextTestSupport {
 
         // now send again with mocks which then add endpoints
 
-        MockEndpoint x = getMockEndpoint("mock:x");
-        MockEndpoint y = getMockEndpoint("mock:y");
-        MockEndpoint z = getMockEndpoint("mock:z");
+        MockEndpoint x = getMockEndpoint("mock:x2");
+        MockEndpoint y = getMockEndpoint("mock:y2");
+        MockEndpoint z = getMockEndpoint("mock:z2");
 
         x.expectedBodiesReceivedInAnyOrder("foo", "bar");
         y.expectedBodiesReceivedInAnyOrder("foo", "bar");
@@ -58,12 +58,12 @@ public class WireTapNoCacheTest extends ContextTestSupport {
 
         assertEquals(4, context.getEndpointRegistry().size());
 
-        sendBody("foo", "mock:x");
-        sendBody("foo", "mock:y");
-        sendBody("foo", "mock:z");
-        sendBody("bar", "mock:x");
-        sendBody("bar", "mock:y");
-        sendBody("bar", "mock:z");
+        sendBody("foo", "mock:x2");
+        sendBody("foo", "mock:y2");
+        sendBody("foo", "mock:z2");
+        sendBody("bar", "mock:x2");
+        sendBody("bar", "mock:y2");
+        sendBody("bar", "mock:z2");
 
         // should not register as new endpoint so we keep at 4
         sendBody("dummy", "mock:dummy");


[camel] 10/11: CAMEL-14596: Regen

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 9efd2fc21143d3f38b44368aea94b7134e5ea7dc
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Feb 21 11:48:36 2020 +0100

    CAMEL-14596: Regen
---
 .../modules/ROOT/pages/dynamic-router.adoc         | 36 ++++++++++------------
 .../modules/ROOT/pages/dynamicRouter-eip.adoc      |  4 ++-
 .../user-manual/modules/ROOT/pages/enrich-eip.adoc |  4 ++-
 .../modules/ROOT/pages/pollEnrich-eip.adoc         |  4 ++-
 .../modules/ROOT/pages/recipientList-eip.adoc      |  4 ++-
 .../modules/ROOT/pages/routingSlip-eip.adoc        |  3 +-
 .../modules/ROOT/pages/wireTap-eip.adoc            |  4 ++-
 7 files changed, 33 insertions(+), 26 deletions(-)

diff --git a/docs/user-manual/modules/ROOT/pages/dynamic-router.adoc b/docs/user-manual/modules/ROOT/pages/dynamic-router.adoc
index bbd3a2b..12aa022 100644
--- a/docs/user-manual/modules/ROOT/pages/dynamic-router.adoc
+++ b/docs/user-manual/modules/ROOT/pages/dynamic-router.adoc
@@ -19,26 +19,6 @@ You must ensure the expression used for the `dynamicRouter` such as a
 bean, will return `null` to indicate the end. Otherwise the
 `dynamicRouter` will keep repeating endlessly.
 
-[[DynamicRouter-Options]]
-== Options
-
-[width="100%",cols="10%,10%,80%",options="header",]
-|=======================================================================
-|Name |Default Value |Description
-
-|`uriDelimiter` |`,` |Delimiter used if the Expression returned multiple
-endpoints.
-
-|`ignoreInvalidEndpoints` |`false` |If an endpoint uri could not be resolved, should it be ignored.
-Otherwise Camel will thrown an exception stating the endpoint uri is not
-valid.
-
-|`cacheSize` |`1000` |Allows to configure the cache size for the
-`ProducerCache` which caches producers for reuse in the routing slip.
-Will by default use the default cache size which is 1000. Setting the
-value to -1 allows to turn off the cache all together.
-|=======================================================================
-
 [[DynamicRouter-DynamicRouterinCamel2.5onwards]]
 == Dynamic Router in Camel 2.5 onwards
 
@@ -49,6 +29,22 @@ allows you to know how far we have processed in the slip. (It's a slip
 because the Dynamic Router implementation is
 based on top of Routing Slip).
 
+TIP: See the `cacheSize` option for more details on _how much cache_ to use depending on how many or few unique endpoints are used.
+
+== Options
+
+// eip options: START
+The Dynamic Router EIP supports 3 options which are listed below:
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|===
+| Name | Description | Default | Type
+| *uriDelimiter* | Sets the uri delimiter to use | , | String
+| *ignoreInvalidEndpoints* | Ignore the invalidate endpoint exception when try to create a producer with that endpoint | false | Boolean
+| *cacheSize* | Sets the maximum size used by the org.apache.camel.spi.ProducerCache which is used to cache and reuse producers when using this routing slip, when uris are reused. Beware that when using dynamic endpoints then it affects how well the cache can be utilized. If each dynamic endpoint is unique then its best to turn of caching by setting this to -1, which allows Camel to not cache both the producers and endpoints; they are regarded as prototype scoped and will be stopped and  [...]
+|===
+// eip options: END
+
 [[DynamicRouter-JavaDSL]]
 == Java DSL
 
diff --git a/docs/user-manual/modules/ROOT/pages/dynamicRouter-eip.adoc b/docs/user-manual/modules/ROOT/pages/dynamicRouter-eip.adoc
index 56b940b..6f04383 100644
--- a/docs/user-manual/modules/ROOT/pages/dynamicRouter-eip.adoc
+++ b/docs/user-manual/modules/ROOT/pages/dynamicRouter-eip.adoc
@@ -16,6 +16,8 @@ _on-the-fly_.
 You must ensure the expression used for the `dynamicRouter` such as a bean, will return `null` to indicate the end. Otherwise the `dynamicRouter` will keep repeating endlessly.
 ===
 
+TIP: See the `cacheSize` option for more details on _how much cache_ to use depending on how many or few unique endpoints are used.
+
 == Options
 
 // eip options: START
@@ -26,7 +28,7 @@ The Dynamic Router EIP supports 3 options which are listed below:
 | Name | Description | Default | Type
 | *uriDelimiter* | Sets the uri delimiter to use | , | String
 | *ignoreInvalidEndpoints* | Ignore the invalidate endpoint exception when try to create a producer with that endpoint | false | Boolean
-| *cacheSize* | Sets the maximum size used by the org.apache.camel.spi.ProducerCache which is used to cache and reuse producers when using this dynamic router, when uris are reused. |  | Integer
+| *cacheSize* | Sets the maximum size used by the org.apache.camel.spi.ProducerCache which is used to cache and reuse producers when using this dynamic router, when uris are reused. Beware that when using dynamic endpoints then it affects how well the cache can be utilized. If each dynamic endpoint is unique then its best to turn of caching by setting this to -1, which allows Camel to not cache both the producers and endpoints; they are regarded as prototype scoped and will be stopped an [...]
 |===
 // eip options: END
 
diff --git a/docs/user-manual/modules/ROOT/pages/enrich-eip.adoc b/docs/user-manual/modules/ROOT/pages/enrich-eip.adoc
index 6a4ad77..e2a9eb9 100644
--- a/docs/user-manual/modules/ROOT/pages/enrich-eip.adoc
+++ b/docs/user-manual/modules/ROOT/pages/enrich-eip.adoc
@@ -6,6 +6,8 @@ Camel supports the Content Enricher from the EIP patterns using a Message Transl
 
 image::eip/DataEnricher.gif[image]
 
+TIP: See the `cacheSize` option for more details on _how much cache_ to use depending on how many or few unique endpoints are used.
+
 // eip options: START
 The Enrich EIP supports 7 options which are listed below:
 
@@ -17,7 +19,7 @@ The Enrich EIP supports 7 options which are listed below:
 | *strategyMethodAllowNull* | If this option is false then the aggregate method is not used if there was no data to enrich. If this option is true then null values is used as the oldExchange (when no data to enrich), when using POJOs as the AggregationStrategy. | false | Boolean
 | *aggregateOnException* | If this option is false then the aggregate method is not used if there was an exception thrown while trying to retrieve the data to enrich from the resource. Setting this option to true allows end users to control what to do if there was an exception in the aggregate method. For example to suppress the exception or set a custom message body etc. | false | Boolean
 | *shareUnitOfWork* | Shares the org.apache.camel.spi.UnitOfWork with the parent and the resource exchange. Enrich will by default not share unit of work between the parent exchange and the resource exchange. This means the resource exchange has its own individual unit of work. | false | Boolean
-| *cacheSize* | Sets the maximum size used by the org.apache.camel.spi.ProducerCache which is used to cache and reuse producer when uris are reused. |  | Integer
+| *cacheSize* | Sets the maximum size used by the org.apache.camel.spi.ProducerCache which is used to cache and reuse producer when uris are reused. Beware that when using dynamic endpoints then it affects how well the cache can be utilized. If each dynamic endpoint is unique then its best to turn of caching by setting this to -1, which allows Camel to not cache both the producers and endpoints; they are regarded as prototype scoped and will be stopped and discarded after use. This reduc [...]
 | *ignoreInvalidEndpoint* | Ignore the invalidate endpoint exception when try to create a producer with that endpoint | false | Boolean
 |===
 // eip options: END
diff --git a/docs/user-manual/modules/ROOT/pages/pollEnrich-eip.adoc b/docs/user-manual/modules/ROOT/pages/pollEnrich-eip.adoc
index d0b5418..8b4d65b 100644
--- a/docs/user-manual/modules/ROOT/pages/pollEnrich-eip.adoc
+++ b/docs/user-manual/modules/ROOT/pages/pollEnrich-eip.adoc
@@ -23,6 +23,8 @@ The timeout values is in millis.
 `pollEnrich` does *not* access any data from the current Exchange which means when polling it cannot use any of the existing headers you may have set on the Exchange. For example you cannot set a filename in the `Exchange.FILE_NAME` header and use `pollEnrich` to consume only that file. For that you *must* set the filename in the endpoint URI.
 Both `enrich` and `pollEnrich` supports dynamic endpoints that uses an Expression to compute the uri, which allows to use data from the current Exchange. In other words all what is told above no longer apply and it just works.
 
+TIP: See the `cacheSize` option for more details on _how much cache_ to use depending on how many or few unique endpoints are used.
+
 == Content enrichment using pollEnrich
 The `pollEnrich` works just as the `enrich` however as it uses a Polling Consumer we have 3 methods when polling
 
@@ -43,7 +45,7 @@ The Poll Enrich EIP supports 7 options which are listed below:
 | *strategyMethodName* | This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy. |  | String
 | *strategyMethodAllowNull* | If this option is false then the aggregate method is not used if there was no data to enrich. If this option is true then null values is used as the oldExchange (when no data to enrich), when using POJOs as the AggregationStrategy. | false | Boolean
 | *aggregateOnException* | If this option is false then the aggregate method is not used if there was an exception thrown while trying to retrieve the data to enrich from the resource. Setting this option to true allows end users to control what to do if there was an exception in the aggregate method. For example to suppress the exception or set a custom message body etc. | false | Boolean
-| *cacheSize* | Sets the maximum size used by the org.apache.camel.spi.ConsumerCache which is used to cache and reuse consumers when uris are reused. |  | Integer
+| *cacheSize* | Sets the maximum size used by the org.apache.camel.spi.ConsumerCache which is used to cache and reuse consumers when uris are reused. Beware that when using dynamic endpoints then it affects how well the cache can be utilized. If each dynamic endpoint is unique then its best to turn of caching by setting this to -1, which allows Camel to not cache both the producers and endpoints; they are regarded as prototype scoped and will be stopped and discarded after use. This redu [...]
 | *ignoreInvalidEndpoint* | Ignore the invalidate endpoint exception when try to create a producer with that endpoint | false | Boolean
 |===
 // eip options: END
diff --git a/docs/user-manual/modules/ROOT/pages/recipientList-eip.adoc b/docs/user-manual/modules/ROOT/pages/recipientList-eip.adoc
index 2bfdef5..eec1ef3 100644
--- a/docs/user-manual/modules/ROOT/pages/recipientList-eip.adoc
+++ b/docs/user-manual/modules/ROOT/pages/recipientList-eip.adoc
@@ -7,6 +7,8 @@ image::eip/RecipientList.gif[image]
 
 The recipients will receive a copy of the *same* Exchange, and Camel will execute them sequentially.
 
+TIP: See the `cacheSize` option for more details on _how much cache_ to use depending on how many or few unique endpoints are used.
+
 == Options
 
 // eip options: START
@@ -27,7 +29,7 @@ The Recipient List EIP supports 15 options which are listed below:
 | *timeout* | Sets a total timeout specified in millis, when using parallel processing. If the Recipient List hasn't been able to send and process all replies within the given timeframe, then the timeout triggers and the Recipient List breaks out and continues. Notice if you provide a TimeoutAwareAggregationStrategy then the timeout method is invoked before breaking out. If the timeout is reached with running tasks still remaining, certain tasks for which it is difficult for Camel to shu [...]
 | *onPrepareRef* | Uses the Processor when preparing the org.apache.camel.Exchange to be send. This can be used to deep-clone messages that should be send, or any custom logic needed before the exchange is send. |  | String
 | *shareUnitOfWork* | Shares the org.apache.camel.spi.UnitOfWork with the parent and each of the sub messages. Recipient List will by default not share unit of work between the parent exchange and each recipient exchange. This means each sub exchange has its own individual unit of work. | false | Boolean
-| *cacheSize* | Sets the maximum size used by the org.apache.camel.spi.ProducerCache which is used to cache and reuse producers when using this recipient list, when uris are reused. |  | Integer
+| *cacheSize* | Sets the maximum size used by the org.apache.camel.spi.ProducerCache which is used to cache and reuse producers when using this recipient list, when uris are reused. Beware that when using dynamic endpoints then it affects how well the cache can be utilized. If each dynamic endpoint is unique then its best to turn of caching by setting this to -1, which allows Camel to not cache both the producers and endpoints; they are regarded as prototype scoped and will be stopped an [...]
 | *parallelAggregate* | If enabled then the aggregate method on AggregationStrategy can be called concurrently. Notice that this would require the implementation of AggregationStrategy to be implemented as thread-safe. By default this is false meaning that Camel synchronizes the call to the aggregate method. Though in some use-cases this can be used to archive higher performance when the AggregationStrategy is implemented as thread-safe. | false | Boolean
 | *stopOnAggregateException* | If enabled, unwind exceptions occurring at aggregation time to the error handler when parallelProcessing is used. Currently, aggregation time exceptions do not stop the route processing when parallelProcessing is used. Enabling this option allows to work around this behavior. The default value is false for the sake of backward compatibility. | false | Boolean
 |===
diff --git a/docs/user-manual/modules/ROOT/pages/routingSlip-eip.adoc b/docs/user-manual/modules/ROOT/pages/routingSlip-eip.adoc
index 911f77f..ca4e70d 100644
--- a/docs/user-manual/modules/ROOT/pages/routingSlip-eip.adoc
+++ b/docs/user-manual/modules/ROOT/pages/routingSlip-eip.adoc
@@ -5,6 +5,7 @@ The Routing Slip from the https://camel.apache.org/enterprise-integration-patter
 
 image::eip/RoutingTableSimple.gif[image]
 
+TIP: See the `cacheSize` option for more details on _how much cache_ to use depending on how many or few unique endpoints are used.
 
 == Options
 
@@ -16,7 +17,7 @@ The Routing Slip EIP supports 3 options which are listed below:
 | Name | Description | Default | Type
 | *uriDelimiter* | Sets the uri delimiter to use | , | String
 | *ignoreInvalidEndpoints* | Ignore the invalidate endpoint exception when try to create a producer with that endpoint | false | Boolean
-| *cacheSize* | Sets the maximum size used by the org.apache.camel.spi.ProducerCache which is used to cache and reuse producers when using this routing slip, when uris are reused. |  | Integer
+| *cacheSize* | Sets the maximum size used by the org.apache.camel.spi.ProducerCache which is used to cache and reuse producers when using this routing slip, when uris are reused. Beware that when using dynamic endpoints then it affects how well the cache can be utilized. If each dynamic endpoint is unique then its best to turn of caching by setting this to -1, which allows Camel to not cache both the producers and endpoints; they are regarded as prototype scoped and will be stopped and  [...]
 |===
 // eip options: END
 
diff --git a/docs/user-manual/modules/ROOT/pages/wireTap-eip.adoc b/docs/user-manual/modules/ROOT/pages/wireTap-eip.adoc
index c5af29a..b7319ad 100644
--- a/docs/user-manual/modules/ROOT/pages/wireTap-eip.adoc
+++ b/docs/user-manual/modules/ROOT/pages/wireTap-eip.adoc
@@ -16,6 +16,8 @@ should consider enabling xref:stream-caching.adoc[Stream caching] to
 ensure the message body can be read at each endpoint. See more details
 at xref:stream-caching.adoc[Stream caching].
 
+TIP: See the `cacheSize` option for more details on _how much cache_ to use depending on how many or few unique endpoints are used.
+
 == Options
 
 // eip options: START
@@ -32,7 +34,7 @@ The Wire Tap EIP supports 11 options which are listed below:
 | *onPrepareRef* | Uses the Processor when preparing the org.apache.camel.Exchange to be send. This can be used to deep-clone messages that should be send, or any custom logic needed before the exchange is send. |  | String
 | *uri* | *Required* The uri of the endpoint to send to. The uri can be dynamic computed using the org.apache.camel.language.simple.SimpleLanguage expression. |  | String
 | *pattern* | Sets the optional ExchangePattern used to invoke this endpoint |  | ExchangePattern
-| *cacheSize* | Sets the maximum size used by the org.apache.camel.spi.ConsumerCache which is used to cache and reuse producers. |  | Integer
+| *cacheSize* | Sets the maximum size used by the org.apache.camel.spi.ProducerCache which is used to cache and reuse producers when using this dynamic router, when uris are reused. Beware that when using dynamic endpoints then it affects how well the cache can be utilized. If each dynamic endpoint is unique then its best to turn of caching by setting this to -1, which allows Camel to not cache both the producers and endpoints; they are regarded as prototype scoped and will be stopped an [...]
 | *ignoreInvalidEndpoint* | Ignore the invalidate endpoint exception when try to create a producer with that endpoint | false | Boolean
 | *allowOptimisedComponents* | Whether to allow components to optimise toD if they are org.apache.camel.spi.SendDynamicAware. | true | Boolean
 |===


[camel] 04/11: CAMEL-14596: camel-core - Optimize Enrich/PollEnrich when cacheSize = -1 to avoid creating endpoints. Introduced getPrototypeEndpoint API.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 94910c6b9172fd12e509f3be4752334dd0c56fdc
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Feb 21 10:18:22 2020 +0100

    CAMEL-14596: camel-core - Optimize Enrich/PollEnrich when cacheSize = -1 to avoid creating endpoints. Introduced getPrototypeEndpoint API.
---
 .../java/org/apache/camel/processor/Enricher.java  | 55 +++++++++++--
 .../org/apache/camel/processor/PollEnricher.java   | 46 ++++++++++-
 .../org/apache/camel/reifier/EnrichReifier.java    |  3 +
 .../apache/camel/processor/EnrichNoCacheTest.java  | 91 ++++++++++++++++++++++
 .../camel/processor/PollEnrichNoCacheTest.java     | 85 ++++++++++++++++++++
 .../org/apache/camel/support/ExchangeHelper.java   |  6 ++
 6 files changed, 276 insertions(+), 10 deletions(-)

diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java b/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java
index c038f98..d61550c 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java
@@ -28,6 +28,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Expression;
 import org.apache.camel.ExtendedExchange;
+import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.impl.engine.DefaultProducerCache;
 import org.apache.camel.impl.engine.EmptyProducerCache;
 import org.apache.camel.spi.EndpointUtilizationStatistics;
@@ -176,10 +177,18 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA
 
         // use dynamic endpoint so calculate the endpoint to use
         Object recipient = null;
+        boolean prototype = cacheSize < 0;
         try {
             recipient = expression.evaluate(exchange, Object.class);
-            endpoint = resolveEndpoint(exchange, recipient);
-            // acquire the consumer from the cache
+            Endpoint existing = getExistingEndpoint(exchange, recipient);
+            if (existing == null) {
+                endpoint = resolveEndpoint(exchange, recipient, prototype);
+            } else {
+                endpoint = existing;
+                // we have an existing endpoint then its not a prototype scope
+                prototype = false;
+            }
+            // acquire the producer from the cache
             producer = producerCache.acquireProducer(endpoint);
         } catch (Throwable e) {
             if (isIgnoreInvalidEndpoint()) {
@@ -203,10 +212,11 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA
         }
         // record timing for sending the exchange using the producer
         final StopWatch watch = sw;
+        final boolean prototypeEndpoint = prototype;
         AsyncProcessor ap = AsyncProcessorConverterHelper.convert(producer);
         boolean sync = ap.process(resourceExchange, new AsyncCallback() {
             public void done(boolean doneSync) {
-                // we only have to handle async completion of the routing slip
+                // we only have to handle async completion
                 if (doneSync) {
                     return;
                 }
@@ -249,6 +259,10 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA
                 } catch (Exception e) {
                     // ignore
                 }
+                // and stop prototype endpoints
+                if (prototypeEndpoint) {
+                    ServiceHelper.stopAndShutdownService(endpoint);
+                }
 
                 callback.done(false);
             }
@@ -306,17 +320,46 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA
         } catch (Exception e) {
             // ignore
         }
+        // and stop prototype endpoints
+        if (prototypeEndpoint) {
+            ServiceHelper.stopAndShutdownService(endpoint);
+        }
 
         callback.done(true);
         return true;
     }
 
-    protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) {
+    protected static Endpoint getExistingEndpoint(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException {
+        // trim strings as end users might have added spaces between separators
+        if (recipient instanceof Endpoint) {
+            return (Endpoint) recipient;
+        } else if (recipient instanceof String) {
+            recipient = ((String) recipient).trim();
+        }
+        if (recipient != null) {
+            // convert to a string type we can work with
+            String uri = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient);
+            return exchange.getContext().hasEndpoint(uri);
+        }
+        return null;
+    }
+
+    protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient, boolean prototype) throws NoTypeConversionAvailableException {
         // trim strings as end users might have added spaces between separators
         if (recipient instanceof String) {
-            recipient = ((String)recipient).trim();
+            recipient = ((String) recipient).trim();
+        } else if (recipient instanceof Endpoint) {
+            return (Endpoint) recipient;
+        } else if (recipient != null) {
+            // convert to a string type we can work with
+            recipient = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient);
+        }
+
+        if (recipient != null) {
+            return prototype ? ExchangeHelper.resolvePrototypeEndpoint(exchange, recipient) : ExchangeHelper.resolveEndpoint(exchange, recipient);
+        } else {
+            return null;
         }
-        return ExchangeHelper.resolveEndpoint(exchange, recipient);
     }
 
     /**
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java b/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java
index 44e8757..de1fea9 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java
@@ -26,6 +26,7 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.ExtendedExchange;
+import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.PollingConsumer;
 import org.apache.camel.impl.engine.DefaultConsumerCache;
 import org.apache.camel.spi.ConsumerCache;
@@ -209,9 +210,17 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout
 
         // use dynamic endpoint so calculate the endpoint to use
         Object recipient = null;
+        boolean prototype = cacheSize < 0;
         try {
             recipient = expression.evaluate(exchange, Object.class);
-            endpoint = resolveEndpoint(exchange, recipient);
+            Endpoint existing = getExistingEndpoint(exchange, recipient);
+            if (existing == null) {
+                endpoint = resolveEndpoint(exchange, recipient, prototype);
+            } else {
+                endpoint = existing;
+                // we have an existing endpoint then its not a prototype scope
+                prototype = false;
+            }
             // acquire the consumer from the cache
             consumer = consumerCache.acquirePollingConsumer(endpoint);
         } catch (Throwable e) {
@@ -266,6 +275,10 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout
         } finally {
             // return the consumer back to the cache
             consumerCache.releasePollingConsumer(endpoint, consumer);
+            // and stop prototype endpoints
+            if (prototype) {
+                ServiceHelper.stopAndShutdownService(endpoint);
+            }
         }
 
         // remember current redelivery stats
@@ -332,12 +345,37 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout
         return true;
     }
 
-    protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) {
+    protected static Endpoint getExistingEndpoint(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException {
+        // trim strings as end users might have added spaces between separators
+        if (recipient instanceof Endpoint) {
+            return (Endpoint) recipient;
+        } else if (recipient instanceof String) {
+            recipient = ((String) recipient).trim();
+        }
+        if (recipient != null) {
+            // convert to a string type we can work with
+            String uri = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient);
+            return exchange.getContext().hasEndpoint(uri);
+        }
+        return null;
+    }
+
+    protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient, boolean prototype) throws NoTypeConversionAvailableException {
         // trim strings as end users might have added spaces between separators
         if (recipient instanceof String) {
-            recipient = ((String)recipient).trim();
+            recipient = ((String) recipient).trim();
+        } else if (recipient instanceof Endpoint) {
+            return (Endpoint) recipient;
+        } else if (recipient != null) {
+            // convert to a string type we can work with
+            recipient = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient);
+        }
+
+        if (recipient != null) {
+            return prototype ? ExchangeHelper.resolvePrototypeEndpoint(exchange, recipient) : ExchangeHelper.resolveEndpoint(exchange, recipient);
+        } else {
+            return null;
         }
-        return ExchangeHelper.resolveEndpoint(exchange, recipient);
     }
 
     /**
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/EnrichReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/EnrichReifier.java
index e353f0c..109ffc1 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/EnrichReifier.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/EnrichReifier.java
@@ -41,6 +41,9 @@ public class EnrichReifier extends ExpressionReifier<EnrichDefinition> {
         Enricher enricher = new Enricher(exp);
         enricher.setShareUnitOfWork(isShareUnitOfWork);
         enricher.setIgnoreInvalidEndpoint(isIgnoreInvalidEndpoint);
+        if (definition.getCacheSize() != null) {
+            enricher.setCacheSize(parseInt(definition.getCacheSize()));
+        }
         AggregationStrategy strategy = createAggregationStrategy();
         if (strategy != null) {
             enricher.setAggregationStrategy(strategy);
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/EnrichNoCacheTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/EnrichNoCacheTest.java
new file mode 100644
index 0000000..c37262a
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/EnrichNoCacheTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+public class EnrichNoCacheTest extends ContextTestSupport {
+
+    @Test
+    public void testNoCache() throws Exception {
+        assertEquals(1, context.getEndpointRegistry().size());
+
+        sendBody("foo", "mock:x");
+        sendBody("foo", "mock:y");
+        sendBody("foo", "mock:z");
+        sendBody("bar", "mock:x");
+        sendBody("bar", "mock:y");
+        sendBody("bar", "mock:z");
+
+        // make sure its using an empty producer cache as the cache is disabled
+        List<Processor> list = context.getRoute("route1").filter("foo");
+        Enricher ep = (Enricher) list.get(0);
+        assertNotNull(ep);
+        assertEquals(-1, ep.getCacheSize());
+
+        // check no additional endpoints added as cache was disabled
+        assertEquals(1, context.getEndpointRegistry().size());
+
+        // now send again with mocks which then add endpoints
+
+        MockEndpoint x = getMockEndpoint("mock:x");
+        MockEndpoint y = getMockEndpoint("mock:y");
+        MockEndpoint z = getMockEndpoint("mock:z");
+
+        x.expectedBodiesReceived("foo", "bar");
+        y.expectedBodiesReceived("foo", "bar");
+        z.expectedBodiesReceived("foo", "bar");
+
+        assertEquals(4, context.getEndpointRegistry().size());
+
+        sendBody("foo", "mock:x");
+        sendBody("foo", "mock:y");
+        sendBody("foo", "mock:z");
+        sendBody("bar", "mock:x");
+        sendBody("bar", "mock:y");
+        sendBody("bar", "mock:z");
+
+        // should not register as new endpoint so we keep at 4
+        sendBody("dummy", "mock:dummy");
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals(4, context.getEndpointRegistry().size());
+    }
+
+    protected void sendBody(String body, String uri) {
+        template.sendBodyAndHeader("direct:a", body, "myHeader", uri);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:a")
+                        .enrich().header("myHeader").cacheSize(-1).end().id("foo");
+            }
+        };
+
+    }
+
+}
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/PollEnrichNoCacheTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/PollEnrichNoCacheTest.java
new file mode 100644
index 0000000..3f323f2
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/PollEnrichNoCacheTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+
+public class PollEnrichNoCacheTest extends ContextTestSupport {
+
+    @Test
+    public void testNoCache() throws Exception {
+        assertEquals(1, context.getEndpointRegistry().size());
+
+        sendBody("foo", "seda:x");
+        sendBody("foo", "seda:y");
+        sendBody("foo", "seda:z");
+        sendBody("bar", "seda:x");
+        sendBody("bar", "seda:y");
+        sendBody("bar", "seda:z");
+
+        // make sure its using an empty producer cache as the cache is disabled
+        List<Processor> list = context.getRoute("route1").filter("foo");
+        PollEnricher ep = (PollEnricher) list.get(0);
+        assertNotNull(ep);
+        assertEquals(-1, ep.getCacheSize());
+
+        // check no additional endpoints added as cache was disabled
+        assertEquals(1, context.getEndpointRegistry().size());
+
+        // now send again and create endpoints
+        template.sendBody("seda:x", "x");
+        template.sendBody("seda:y", "y");
+        template.sendBody("seda:z", "z");
+
+        assertEquals(4, context.getEndpointRegistry().size());
+
+        sendBody("foo", "seda:x");
+        sendBody("foo", "seda:y");
+        sendBody("foo", "seda:z");
+        sendBody("bar", "seda:x");
+        sendBody("bar", "seda:y");
+        sendBody("bar", "seda:z");
+
+        // should not register as new endpoint so we keep at 4
+        sendBody("dummy", "seda:dummy");
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals(4, context.getEndpointRegistry().size());
+    }
+
+    protected void sendBody(String body, String uri) {
+        template.sendBodyAndHeader("direct:a", body, "myHeader", uri);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:a")
+                        .pollEnrich().header("myHeader").timeout(0).cacheSize(-1).end().id("foo");
+            }
+        };
+
+    }
+
+}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
index bfac356..e208448 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
@@ -89,6 +89,9 @@ public final class ExchangeHelper {
      * @throws NoSuchEndpointException if the endpoint cannot be resolved
      */
     public static Endpoint resolveEndpoint(Exchange exchange, Object value) throws NoSuchEndpointException {
+        if (value == null) {
+            throw new NoSuchEndpointException("null");
+        }
         Endpoint endpoint;
         if (value instanceof Endpoint) {
             endpoint = (Endpoint) value;
@@ -110,6 +113,9 @@ public final class ExchangeHelper {
      * @throws NoSuchEndpointException if the endpoint cannot be resolved
      */
     public static Endpoint resolvePrototypeEndpoint(Exchange exchange, Object value) throws NoSuchEndpointException {
+        if (value == null) {
+            throw new NoSuchEndpointException("null");
+        }
         Endpoint endpoint;
         if (value instanceof Endpoint) {
             endpoint = (Endpoint) value;


[camel] 08/11: CAMEL-14596: Move new API to ExtendedCamelContext as its not for end users

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 3c031d83623da7603eb582157ac46802421744ae
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Feb 21 11:20:39 2020 +0100

    CAMEL-14596: Move new API to ExtendedCamelContext as its not for end users
---
 .../src/test/java/org/apache/camel/impl/EndpointPrototypeTest.java   | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/EndpointPrototypeTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/EndpointPrototypeTest.java
index c4523f7..8cbb681 100644
--- a/core/camel-core/src/test/java/org/apache/camel/impl/EndpointPrototypeTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/impl/EndpointPrototypeTest.java
@@ -21,6 +21,7 @@ import java.util.List;
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Endpoint;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.support.LifecycleStrategySupport;
 import org.junit.Test;
@@ -44,7 +45,7 @@ public class EndpointPrototypeTest extends ContextTestSupport {
 
         // now get a prototype which should not be added
 
-        Endpoint prototype = context.getPrototypeEndpoint("mock:bar");
+        Endpoint prototype = context.adapt(ExtendedCamelContext.class).getPrototypeEndpoint("mock:bar");
         assertNotNull(prototype);
 
         // and should be started
@@ -88,7 +89,7 @@ public class EndpointPrototypeTest extends ContextTestSupport {
 
         // now get a prototype which should not be added
 
-        Endpoint prototype = context.getPrototypeEndpoint("mock:bar");
+        Endpoint prototype = context.adapt(ExtendedCamelContext.class).getPrototypeEndpoint("mock:bar");
         assertNotNull(prototype);
 
         // and should be started


[camel] 03/11: CAMEL-14596: camel-core - Optimize RoutingSlip/DynamicRouter when cacheSize = -1 to avoid creating endpoints. Introduced getPrototypeEndpoint API.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 9990d584352d42b304374e0c278ecc51768e8068
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Feb 21 10:01:14 2020 +0100

    CAMEL-14596: camel-core - Optimize RoutingSlip/DynamicRouter when cacheSize = -1 to avoid creating endpoints. Introduced getPrototypeEndpoint API.
---
 .../org/apache/camel/processor/RoutingSlip.java    | 82 +++++++++++++++-------
 ...acheTest.java => DynamicRouterNoCacheTest.java} | 61 +++++++---------
 .../camel/processor/RecipientListNoCacheTest.java  | 56 +++++++--------
 .../camel/processor/RoutingSlipNoCacheTest.java    | 22 +++++-
 ...istNoCacheTest.java => WireTapNoCacheTest.java} | 75 ++++++++------------
 5 files changed, 159 insertions(+), 137 deletions(-)

diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/RoutingSlip.java b/core/camel-base/src/main/java/org/apache/camel/processor/RoutingSlip.java
index cb31c0c..d8218d5 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/RoutingSlip.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/RoutingSlip.java
@@ -26,6 +26,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.FailedToCreateProducerException;
 import org.apache.camel.Message;
+import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.Traceable;
 import org.apache.camel.impl.engine.DefaultProducerCache;
 import org.apache.camel.impl.engine.EmptyProducerCache;
@@ -35,7 +36,6 @@ import org.apache.camel.spi.ProducerCache;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.spi.RouteIdAware;
 import org.apache.camel.support.AsyncProcessorSupport;
-import org.apache.camel.support.DefaultExchange;
 import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.MessageHelper;
 import org.apache.camel.support.ObjectHelper;
@@ -241,9 +241,19 @@ public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdA
         }
 
         while (iter.hasNext(current)) {
+
+            boolean prototype = cacheSize < 0;
             Endpoint endpoint;
             try {
-                endpoint = resolveEndpoint(iter, exchange);
+                Object recipient = iter.next(exchange);
+                Endpoint existing = getExistingEndpoint(exchange, recipient);
+                if (existing == null) {
+                    endpoint = resolveEndpoint(exchange, recipient, prototype);
+                } else {
+                    endpoint = existing;
+                    // we have an existing endpoint then its not a prototype scope
+                    prototype = false;
+                }
                 // if no endpoint was resolved then try the next
                 if (endpoint == null) {
                     continue;
@@ -255,7 +265,7 @@ public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdA
             }
 
             //process and prepare the routing slip
-            boolean sync = processExchange(endpoint, current, exchange, originalCallback, iter);
+            boolean sync = processExchange(endpoint, current, exchange, originalCallback, iter, prototype);
             current = prepareExchangeForRoutingSlip(current, endpoint);
             
             if (!sync) {
@@ -305,14 +315,28 @@ public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdA
         return true;
     }
 
-    protected Endpoint resolveEndpoint(RoutingSlipIterator iter, Exchange exchange) throws Exception {
-        Object nextRecipient = iter.next(exchange);
+    protected static Endpoint getExistingEndpoint(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException {
+        // trim strings as end users might have added spaces between separators
+        if (recipient instanceof Endpoint) {
+            return (Endpoint) recipient;
+        } else if (recipient instanceof String) {
+            recipient = ((String) recipient).trim();
+        }
+        if (recipient != null) {
+            // convert to a string type we can work with
+            String uri = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient);
+            return exchange.getContext().hasEndpoint(uri);
+        }
+        return null;
+    }
+
+    protected Endpoint resolveEndpoint(Exchange exchange, Object recipient, boolean prototype) throws Exception {
         Endpoint endpoint = null;
         try {
-            endpoint = ExchangeHelper.resolveEndpoint(exchange, nextRecipient);
+            endpoint = prototype ? ExchangeHelper.resolvePrototypeEndpoint(exchange, recipient) : ExchangeHelper.resolveEndpoint(exchange, recipient);
         } catch (Exception e) {
             if (isIgnoreInvalidEndpoints()) {
-                LOG.info("Endpoint uri is invalid: " + nextRecipient + ". This exception will be ignored.", e);
+                LOG.debug("Endpoint uri is invalid: " + recipient + ". This exception will be ignored.", e);
             } else {
                 throw e;
             }
@@ -355,7 +379,7 @@ public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdA
     }
 
     protected boolean processExchange(final Endpoint endpoint, final Exchange exchange, final Exchange original,
-                                      final AsyncCallback originalCallback, final RoutingSlipIterator iter) {
+                                      final AsyncCallback originalCallback, final RoutingSlipIterator iter, final boolean prototype) {
 
         // this does the actual processing so log at trace level
         if (LOG.isTraceEnabled()) {
@@ -389,6 +413,10 @@ public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdA
 
                     // we only have to handle async completion of the routing slip
                     if (doneSync) {
+                        // and stop prototype endpoints
+                        if (prototype) {
+                            ServiceHelper.stopAndShutdownService(endpoint);
+                        }
                         cb.done(true);
                         return;
                     }
@@ -416,11 +444,20 @@ public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdA
                                 break;
                             }
 
-                            Endpoint endpoint1;
+                            Endpoint nextEndpoint;
+                            boolean prototype = cacheSize < 0;
                             try {
-                                endpoint1 = resolveEndpoint(iter, ex);
+                                Object recipient = iter.next(ex);
+                                Endpoint existing = getExistingEndpoint(exchange, recipient);
+                                if (existing == null) {
+                                    nextEndpoint = resolveEndpoint(exchange, recipient, prototype);
+                                } else {
+                                    nextEndpoint = existing;
+                                    // we have an existing endpoint then its not a prototype scope
+                                    prototype = false;
+                                }
                                 // if no endpoint was resolved then try the next
-                                if (endpoint1 == null) {
+                                if (nextEndpoint == null) {
                                     continue;
                                 }
                             } catch (Exception e) {
@@ -430,8 +467,16 @@ public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdA
                             }
 
                             // prepare and process the routing slip
-                            boolean sync = processExchange(endpoint1, current, original, cb, iter);
-                            current = prepareExchangeForRoutingSlip(current, endpoint1);
+                            final boolean prototypeEndpoint = prototype;
+                            AsyncCallback cbNext = (doneNext) -> {
+                                // and stop prototype endpoints
+                                if (prototypeEndpoint) {
+                                    ServiceHelper.stopAndShutdownService(nextEndpoint);
+                                }
+                                cb.done(doneNext);
+                            };
+                            boolean sync = processExchange(nextEndpoint, current, original, cbNext, iter, prototype);
+                            current = prepareExchangeForRoutingSlip(current, nextEndpoint);
 
                             if (!sync) {
                                 if (LOG.isTraceEnabled()) {
@@ -499,17 +544,6 @@ public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdA
     }
 
     /**
-     * Copy the outbound data in 'source' to the inbound data in 'result'.
-     */
-    private void copyOutToIn(Exchange result, Exchange source) {
-        result.setException(source.getException());
-        result.setIn(getResultMessage(source));
-
-        result.getProperties().clear();
-        result.getProperties().putAll(source.getProperties());
-    }
-
-    /**
      * Creates the embedded processor to use when wrapping this routing slip in an error handler.
      */
     public AsyncProcessor newRoutingSlipProcessorForErrorHandler() {
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/DynamicRouterNoCacheTest.java
similarity index 61%
copy from core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
copy to core/camel-core/src/test/java/org/apache/camel/processor/DynamicRouterNoCacheTest.java
index f396c42..16f188b 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/DynamicRouterNoCacheTest.java
@@ -17,50 +17,21 @@
 package org.apache.camel.processor;
 
 import java.util.List;
-import java.util.concurrent.ExecutorService;
+import java.util.Map;
 
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Headers;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.engine.EmptyProducerCache;
-import org.apache.camel.util.ReflectionHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.StringHelper;
 import org.junit.Test;
 
-public class RecipientListNoCacheTest extends ContextTestSupport {
+public class DynamicRouterNoCacheTest extends ContextTestSupport {
 
     @Test
     public void testNoCache() throws Exception {
-        MockEndpoint x = getMockEndpoint("mock:x");
-        MockEndpoint y = getMockEndpoint("mock:y");
-        MockEndpoint z = getMockEndpoint("mock:z");
-
-        x.expectedBodiesReceived("foo", "bar");
-        y.expectedBodiesReceived("foo", "bar");
-        z.expectedBodiesReceived("foo", "bar");
-
-        sendBody("foo");
-        sendBody("bar");
-
-        assertMockEndpointsSatisfied();
-
-        // make sure its using an empty producer cache as the cache is disabled
-        List<Processor> list = context.getRoute("route1").filter("foo");
-        RecipientList rl = (RecipientList) list.get(0);
-        assertNotNull(rl);
-        assertEquals(-1, rl.getCacheSize());
-
-        Object pc = ReflectionHelper.getField(rl.getClass().getDeclaredField("producerCache"), rl);
-        assertNotNull(pc);
-        assertIsInstanceOf(EmptyProducerCache.class, pc);
-
-        // and no thread pool is in use as timeout is 0
-        pc = ReflectionHelper.getField(rl.getClass().getDeclaredField("aggregateExecutorService"), rl);
-        assertNull(pc);
-    }
-
-    @Test
-    public void testNoCacheTwo() throws Exception {
         assertEquals(1, context.getEndpointRegistry().size());
 
         sendBody("foo");
@@ -68,7 +39,7 @@ public class RecipientListNoCacheTest extends ContextTestSupport {
 
         // make sure its using an empty producer cache as the cache is disabled
         List<Processor> list = context.getRoute("route1").filter("foo");
-        RecipientList rl = (RecipientList) list.get(0);
+        DynamicRouter rl = (DynamicRouter) list.get(0);
         assertNotNull(rl);
         assertEquals(-1, rl.getCacheSize());
 
@@ -96,12 +67,28 @@ public class RecipientListNoCacheTest extends ContextTestSupport {
         template.sendBodyAndHeader("direct:a", body, "recipientListHeader", "mock:x,mock:y,mock:z");
     }
 
+    public String slip(@Headers Map headers) {
+        String header = (String) headers.get("recipientListHeader");
+        if (ObjectHelper.isEmpty(header)) {
+            return null;
+        }
+        if (header.contains(",")) {
+            String next = StringHelper.before(header, ",");
+            String rest = StringHelper.after(header, ",");
+            headers.put("recipientListHeader", rest);
+            return next;
+        } else {
+            // last slip
+            headers.put("recipientListHeader", "");
+            return header;
+        }
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
-                from("direct:a")
-                        .recipientList(header("recipientListHeader").tokenize(",")).cacheSize(-1).id("foo");
+                from("direct:a").dynamicRouter(method(DynamicRouterNoCacheTest.class, "slip")).cacheSize(-1).id("foo");
             }
         };
 
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
index f396c42..a97b2d3 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
@@ -31,51 +31,39 @@ public class RecipientListNoCacheTest extends ContextTestSupport {
 
     @Test
     public void testNoCache() throws Exception {
-        MockEndpoint x = getMockEndpoint("mock:x");
-        MockEndpoint y = getMockEndpoint("mock:y");
-        MockEndpoint z = getMockEndpoint("mock:z");
-
-        x.expectedBodiesReceived("foo", "bar");
-        y.expectedBodiesReceived("foo", "bar");
-        z.expectedBodiesReceived("foo", "bar");
+        assertEquals(1, context.getEndpointRegistry().size());
 
         sendBody("foo");
         sendBody("bar");
 
-        assertMockEndpointsSatisfied();
-
         // make sure its using an empty producer cache as the cache is disabled
         List<Processor> list = context.getRoute("route1").filter("foo");
         RecipientList rl = (RecipientList) list.get(0);
         assertNotNull(rl);
         assertEquals(-1, rl.getCacheSize());
 
-        Object pc = ReflectionHelper.getField(rl.getClass().getDeclaredField("producerCache"), rl);
-        assertNotNull(pc);
-        assertIsInstanceOf(EmptyProducerCache.class, pc);
+        // check no additional endpoints added as cache was disabled
+        assertEquals(1, context.getEndpointRegistry().size());
 
-        // and no thread pool is in use as timeout is 0
-        pc = ReflectionHelper.getField(rl.getClass().getDeclaredField("aggregateExecutorService"), rl);
-        assertNull(pc);
-    }
+        // now send again with mocks which then add endpoints
+        MockEndpoint x = getMockEndpoint("mock:x");
+        MockEndpoint y = getMockEndpoint("mock:y");
+        MockEndpoint z = getMockEndpoint("mock:z");
 
-    @Test
-    public void testNoCacheTwo() throws Exception {
-        assertEquals(1, context.getEndpointRegistry().size());
+        x.expectedBodiesReceived("foo", "bar");
+        y.expectedBodiesReceived("foo", "bar");
+        z.expectedBodiesReceived("foo", "bar");
 
         sendBody("foo");
         sendBody("bar");
 
-        // make sure its using an empty producer cache as the cache is disabled
-        List<Processor> list = context.getRoute("route1").filter("foo");
-        RecipientList rl = (RecipientList) list.get(0);
-        assertNotNull(rl);
-        assertEquals(-1, rl.getCacheSize());
+        assertMockEndpointsSatisfied();
 
-        // check no additional endpoints added as cache was disabled
-        assertEquals(1, context.getEndpointRegistry().size());
+        assertEquals(4, context.getEndpointRegistry().size());
+    }
 
-        // now send again with mocks which then add endpoints
+    @Test
+    public void testNoThreadPool() throws Exception {
         MockEndpoint x = getMockEndpoint("mock:x");
         MockEndpoint y = getMockEndpoint("mock:y");
         MockEndpoint z = getMockEndpoint("mock:z");
@@ -89,7 +77,19 @@ public class RecipientListNoCacheTest extends ContextTestSupport {
 
         assertMockEndpointsSatisfied();
 
-        assertEquals(4, context.getEndpointRegistry().size());
+        // make sure its using an empty producer cache as the cache is disabled
+        List<Processor> list = context.getRoute("route1").filter("foo");
+        RecipientList rl = (RecipientList) list.get(0);
+        assertNotNull(rl);
+        assertEquals(-1, rl.getCacheSize());
+
+        Object pc = ReflectionHelper.getField(rl.getClass().getDeclaredField("producerCache"), rl);
+        assertNotNull(pc);
+        assertIsInstanceOf(EmptyProducerCache.class, pc);
+
+        // and no thread pool is in use as timeout is 0
+        pc = ReflectionHelper.getField(rl.getClass().getDeclaredField("aggregateExecutorService"), rl);
+        assertNull(pc);
     }
 
     protected void sendBody(String body) {
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipNoCacheTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipNoCacheTest.java
index 752916e..e1e92fb 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipNoCacheTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipNoCacheTest.java
@@ -16,7 +16,10 @@
  */
 package org.apache.camel.processor;
 
+import java.util.List;
+
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.junit.Test;
@@ -25,6 +28,21 @@ public class RoutingSlipNoCacheTest extends ContextTestSupport {
 
     @Test
     public void testNoCache() throws Exception {
+        assertEquals(1, context.getEndpointRegistry().size());
+
+        sendBody("foo");
+        sendBody("bar");
+
+        // make sure its using an empty producer cache as the cache is disabled
+        List<Processor> list = context.getRoute("route1").filter("foo");
+        RoutingSlip rl = (RoutingSlip) list.get(0);
+        assertNotNull(rl);
+        assertEquals(-1, rl.getCacheSize());
+
+        // check no additional endpoints added as cache was disabled
+        assertEquals(1, context.getEndpointRegistry().size());
+
+        // now send again with mocks which then add endpoints
         MockEndpoint x = getMockEndpoint("mock:x");
         MockEndpoint y = getMockEndpoint("mock:y");
         MockEndpoint z = getMockEndpoint("mock:z");
@@ -37,6 +55,8 @@ public class RoutingSlipNoCacheTest extends ContextTestSupport {
         sendBody("bar");
 
         assertMockEndpointsSatisfied();
+
+        assertEquals(4, context.getEndpointRegistry().size());
     }
 
     protected void sendBody(String body) {
@@ -47,7 +67,7 @@ public class RoutingSlipNoCacheTest extends ContextTestSupport {
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
-                from("direct:a").routingSlip(header("recipientListHeader").tokenize(",")).cacheSize(0);
+                from("direct:a").routingSlip(header("recipientListHeader").tokenize(",")).cacheSize(-1).id("foo");
             }
         };
 
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/WireTapNoCacheTest.java
similarity index 50%
copy from core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
copy to core/camel-core/src/test/java/org/apache/camel/processor/WireTapNoCacheTest.java
index f396c42..44e3dc9 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/WireTapNoCacheTest.java
@@ -17,83 +17,64 @@
 package org.apache.camel.processor;
 
 import java.util.List;
-import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.engine.EmptyProducerCache;
-import org.apache.camel.util.ReflectionHelper;
 import org.junit.Test;
 
-public class RecipientListNoCacheTest extends ContextTestSupport {
+public class WireTapNoCacheTest extends ContextTestSupport {
 
     @Test
     public void testNoCache() throws Exception {
-        MockEndpoint x = getMockEndpoint("mock:x");
-        MockEndpoint y = getMockEndpoint("mock:y");
-        MockEndpoint z = getMockEndpoint("mock:z");
-
-        x.expectedBodiesReceived("foo", "bar");
-        y.expectedBodiesReceived("foo", "bar");
-        z.expectedBodiesReceived("foo", "bar");
-
-        sendBody("foo");
-        sendBody("bar");
-
-        assertMockEndpointsSatisfied();
-
-        // make sure its using an empty producer cache as the cache is disabled
-        List<Processor> list = context.getRoute("route1").filter("foo");
-        RecipientList rl = (RecipientList) list.get(0);
-        assertNotNull(rl);
-        assertEquals(-1, rl.getCacheSize());
-
-        Object pc = ReflectionHelper.getField(rl.getClass().getDeclaredField("producerCache"), rl);
-        assertNotNull(pc);
-        assertIsInstanceOf(EmptyProducerCache.class, pc);
-
-        // and no thread pool is in use as timeout is 0
-        pc = ReflectionHelper.getField(rl.getClass().getDeclaredField("aggregateExecutorService"), rl);
-        assertNull(pc);
-    }
-
-    @Test
-    public void testNoCacheTwo() throws Exception {
         assertEquals(1, context.getEndpointRegistry().size());
 
-        sendBody("foo");
-        sendBody("bar");
+        sendBody("foo", "mock:x");
+        sendBody("foo", "mock:y");
+        sendBody("foo", "mock:z");
+        sendBody("bar", "mock:x");
+        sendBody("bar", "mock:y");
+        sendBody("bar", "mock:z");
 
         // make sure its using an empty producer cache as the cache is disabled
         List<Processor> list = context.getRoute("route1").filter("foo");
-        RecipientList rl = (RecipientList) list.get(0);
-        assertNotNull(rl);
-        assertEquals(-1, rl.getCacheSize());
+        WireTapProcessor wtp = (WireTapProcessor) list.get(0);
+        assertNotNull(wtp);
+        assertEquals(-1, wtp.getCacheSize());
 
         // check no additional endpoints added as cache was disabled
         assertEquals(1, context.getEndpointRegistry().size());
 
         // now send again with mocks which then add endpoints
+
         MockEndpoint x = getMockEndpoint("mock:x");
         MockEndpoint y = getMockEndpoint("mock:y");
         MockEndpoint z = getMockEndpoint("mock:z");
 
-        x.expectedBodiesReceived("foo", "bar");
-        y.expectedBodiesReceived("foo", "bar");
-        z.expectedBodiesReceived("foo", "bar");
+        x.expectedBodiesReceivedInAnyOrder("foo", "bar");
+        y.expectedBodiesReceivedInAnyOrder("foo", "bar");
+        z.expectedBodiesReceivedInAnyOrder("foo", "bar");
+
+        assertEquals(4, context.getEndpointRegistry().size());
+
+        sendBody("foo", "mock:x");
+        sendBody("foo", "mock:y");
+        sendBody("foo", "mock:z");
+        sendBody("bar", "mock:x");
+        sendBody("bar", "mock:y");
+        sendBody("bar", "mock:z");
 
-        sendBody("foo");
-        sendBody("bar");
+        // should not register as new endpoint so we keep at 4
+        sendBody("dummy", "mock:dummy");
 
         assertMockEndpointsSatisfied();
 
         assertEquals(4, context.getEndpointRegistry().size());
     }
 
-    protected void sendBody(String body) {
-        template.sendBodyAndHeader("direct:a", body, "recipientListHeader", "mock:x,mock:y,mock:z");
+    protected void sendBody(String body, String uri) {
+        template.sendBodyAndHeader("direct:a", body, "myHeader", uri);
     }
 
     @Override
@@ -101,7 +82,7 @@ public class RecipientListNoCacheTest extends ContextTestSupport {
         return new RouteBuilder() {
             public void configure() {
                 from("direct:a")
-                        .recipientList(header("recipientListHeader").tokenize(",")).cacheSize(-1).id("foo");
+                        .wireTap("${header.myHeader}").cacheSize(-1).id("foo").end();
             }
         };
 


[camel] 05/11: CAMEL-14596: Update cache size documentation

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 8cea4e7503f783b1142f99024977598e9f9b8be5
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Feb 21 10:42:50 2020 +0100

    CAMEL-14596: Update cache size documentation
---
 .../org/apache/camel/model/dynamicRouter.json      |  2 +-
 .../resources/org/apache/camel/model/enrich.json   |  2 +-
 .../org/apache/camel/model/pollEnrich.json         |  2 +-
 .../org/apache/camel/model/recipientList.json      |  2 +-
 .../org/apache/camel/model/routingSlip.json        |  2 +-
 .../camel/model/DynamicRouterDefinition.java       | 40 ++++++++++++++++++++++
 .../org/apache/camel/model/EnrichDefinition.java   | 40 ++++++++++++++++++++++
 .../apache/camel/model/PollEnrichDefinition.java   | 40 ++++++++++++++++++++++
 .../camel/model/RecipientListDefinition.java       | 40 ++++++++++++++++++++++
 .../apache/camel/model/RoutingSlipDefinition.java  | 26 ++++++++++++++
 .../org/apache/camel/model/WireTapDefinition.java  | 26 ++++++++++++++
 11 files changed, 217 insertions(+), 5 deletions(-)

diff --git a/core/camel-core-engine/src/generated/resources/org/apache/camel/model/dynamicRouter.json b/core/camel-core-engine/src/generated/resources/org/apache/camel/model/dynamicRouter.json
index be30ae6..75eaf43 100644
--- a/core/camel-core-engine/src/generated/resources/org/apache/camel/model/dynamicRouter.json
+++ b/core/camel-core-engine/src/generated/resources/org/apache/camel/model/dynamicRouter.json
@@ -14,7 +14,7 @@
     "expression": { "kind": "expression", "displayName": "Expression", "required": true, "type": "object", "javaType": "org.apache.camel.model.language.ExpressionDefinition", "oneOf": [ "constant", "exchangeProperty", "groovy", "header", "hl7terser", "jsonpath", "language", "method", "mvel", "ognl", "ref", "simple", "spel", "tokenize", "xpath", "xquery", "xtokenize" ], "deprecated": false, "secret": false, "description": "Expression to call that returns the endpoint(s) to route to in the [...]
     "uriDelimiter": { "kind": "attribute", "displayName": "Uri Delimiter", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "defaultValue": ",", "description": "Sets the uri delimiter to use" },
     "ignoreInvalidEndpoints": { "kind": "attribute", "displayName": "Ignore Invalid Endpoints", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "Ignore the invalidate endpoint exception when try to create a producer with that endpoint" },
-    "cacheSize": { "kind": "attribute", "displayName": "Cache Size", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "Sets the maximum size used by the org.apache.camel.spi.ProducerCache which is used to cache and reuse producers when using this dynamic router, when uris are reused." },
+    "cacheSize": { "kind": "attribute", "displayName": "Cache Size", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "Sets the maximum size used by the org.apache.camel.spi.ProducerCache which is used to cache and reuse producers when using this dynamic router, when uris are reused. Beware that when using dynamic endpoints then it affects how well the cache can be utilized. If each dynamic endpoint is unique then i [...]
     "id": { "kind": "attribute", "displayName": "Id", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "Sets the id of this node" },
     "description": { "kind": "element", "displayName": "Description", "required": false, "type": "object", "javaType": "org.apache.camel.model.DescriptionDefinition", "deprecated": false, "secret": false, "description": "Sets the description of this node" }
   }
diff --git a/core/camel-core-engine/src/generated/resources/org/apache/camel/model/enrich.json b/core/camel-core-engine/src/generated/resources/org/apache/camel/model/enrich.json
index 70eaf77..d199764 100644
--- a/core/camel-core-engine/src/generated/resources/org/apache/camel/model/enrich.json
+++ b/core/camel-core-engine/src/generated/resources/org/apache/camel/model/enrich.json
@@ -17,7 +17,7 @@
     "strategyMethodAllowNull": { "kind": "attribute", "displayName": "Strategy Method Allow Null", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "If this option is false then the aggregate method is not used if there was no data to enrich. If this option is true then null values is used as the oldExchange (when no data to enrich), when using POJOs as the AggregationStrategy." },
     "aggregateOnException": { "kind": "attribute", "displayName": "Aggregate On Exception", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "If this option is false then the aggregate method is not used if there was an exception thrown while trying to retrieve the data to enrich from the resource. Setting this option to true allows end users to control what to do if there was an exception in the aggregate method. F [...]
     "shareUnitOfWork": { "kind": "attribute", "displayName": "Share Unit Of Work", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "Shares the org.apache.camel.spi.UnitOfWork with the parent and the resource exchange. Enrich will by default not share unit of work between the parent exchange and the resource exchange. This means the resource exchange has its own individual unit of work." },
-    "cacheSize": { "kind": "attribute", "displayName": "Cache Size", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "Sets the maximum size used by the org.apache.camel.spi.ProducerCache which is used to cache and reuse producer when uris are reused." },
+    "cacheSize": { "kind": "attribute", "displayName": "Cache Size", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "Sets the maximum size used by the org.apache.camel.spi.ProducerCache which is used to cache and reuse producer when uris are reused. Beware that when using dynamic endpoints then it affects how well the cache can be utilized. If each dynamic endpoint is unique then its best to turn of caching by set [...]
     "ignoreInvalidEndpoint": { "kind": "attribute", "displayName": "Ignore Invalid Endpoint", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "Ignore the invalidate endpoint exception when try to create a producer with that endpoint" },
     "id": { "kind": "attribute", "displayName": "Id", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "Sets the id of this node" },
     "description": { "kind": "element", "displayName": "Description", "required": false, "type": "object", "javaType": "org.apache.camel.model.DescriptionDefinition", "deprecated": false, "secret": false, "description": "Sets the description of this node" }
diff --git a/core/camel-core-engine/src/generated/resources/org/apache/camel/model/pollEnrich.json b/core/camel-core-engine/src/generated/resources/org/apache/camel/model/pollEnrich.json
index 62a03a8..77d8d01 100644
--- a/core/camel-core-engine/src/generated/resources/org/apache/camel/model/pollEnrich.json
+++ b/core/camel-core-engine/src/generated/resources/org/apache/camel/model/pollEnrich.json
@@ -17,7 +17,7 @@
     "strategyMethodName": { "kind": "attribute", "displayName": "Strategy Method Name", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy." },
     "strategyMethodAllowNull": { "kind": "attribute", "displayName": "Strategy Method Allow Null", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "If this option is false then the aggregate method is not used if there was no data to enrich. If this option is true then null values is used as the oldExchange (when no data to enrich), when using POJOs as the AggregationStrategy." },
     "aggregateOnException": { "kind": "attribute", "displayName": "Aggregate On Exception", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "If this option is false then the aggregate method is not used if there was an exception thrown while trying to retrieve the data to enrich from the resource. Setting this option to true allows end users to control what to do if there was an exception in the aggregate method. F [...]
-    "cacheSize": { "kind": "attribute", "displayName": "Cache Size", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "Sets the maximum size used by the org.apache.camel.spi.ConsumerCache which is used to cache and reuse consumers when uris are reused." },
+    "cacheSize": { "kind": "attribute", "displayName": "Cache Size", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "Sets the maximum size used by the org.apache.camel.spi.ConsumerCache which is used to cache and reuse consumers when uris are reused. Beware that when using dynamic endpoints then it affects how well the cache can be utilized. If each dynamic endpoint is unique then its best to turn of caching by se [...]
     "ignoreInvalidEndpoint": { "kind": "attribute", "displayName": "Ignore Invalid Endpoint", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "Ignore the invalidate endpoint exception when try to create a producer with that endpoint" },
     "id": { "kind": "attribute", "displayName": "Id", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "Sets the id of this node" },
     "description": { "kind": "element", "displayName": "Description", "required": false, "type": "object", "javaType": "org.apache.camel.model.DescriptionDefinition", "deprecated": false, "secret": false, "description": "Sets the description of this node" }
diff --git a/core/camel-core-engine/src/generated/resources/org/apache/camel/model/recipientList.json b/core/camel-core-engine/src/generated/resources/org/apache/camel/model/recipientList.json
index d9c0950..f943560 100644
--- a/core/camel-core-engine/src/generated/resources/org/apache/camel/model/recipientList.json
+++ b/core/camel-core-engine/src/generated/resources/org/apache/camel/model/recipientList.json
@@ -24,7 +24,7 @@
     "timeout": { "kind": "attribute", "displayName": "Timeout", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "defaultValue": "0", "description": "Sets a total timeout specified in millis, when using parallel processing. If the Recipient List hasn't been able to send and process all replies within the given timeframe, then the timeout triggers and the Recipient List breaks out and continues. Notice if you provide a TimeoutAware [...]
     "onPrepareRef": { "kind": "attribute", "displayName": "On Prepare Ref", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "Uses the Processor when preparing the org.apache.camel.Exchange to be send. This can be used to deep-clone messages that should be send, or any custom logic needed before the exchange is send." },
     "shareUnitOfWork": { "kind": "attribute", "displayName": "Share Unit Of Work", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "Shares the org.apache.camel.spi.UnitOfWork with the parent and each of the sub messages. Recipient List will by default not share unit of work between the parent exchange and each recipient exchange. This means each sub exchange has its own individual unit of work." },
-    "cacheSize": { "kind": "attribute", "displayName": "Cache Size", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "Sets the maximum size used by the org.apache.camel.spi.ProducerCache which is used to cache and reuse producers when using this recipient list, when uris are reused." },
+    "cacheSize": { "kind": "attribute", "displayName": "Cache Size", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "Sets the maximum size used by the org.apache.camel.spi.ProducerCache which is used to cache and reuse producers when using this recipient list, when uris are reused. Beware that when using dynamic endpoints then it affects how well the cache can be utilized. If each dynamic endpoint is unique then i [...]
     "parallelAggregate": { "kind": "attribute", "displayName": "Parallel Aggregate", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "If enabled then the aggregate method on AggregationStrategy can be called concurrently. Notice that this would require the implementation of AggregationStrategy to be implemented as thread-safe. By default this is false meaning that Camel synchronizes the call to the aggregate method [...]
     "stopOnAggregateException": { "kind": "attribute", "displayName": "Stop On Aggregate Exception", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "If enabled, unwind exceptions occurring at aggregation time to the error handler when parallelProcessing is used. Currently, aggregation time exceptions do not stop the route processing when parallelProcessing is used. Enabling this option allows to work around this b [...]
     "id": { "kind": "attribute", "displayName": "Id", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "Sets the id of this node" },
diff --git a/core/camel-core-engine/src/generated/resources/org/apache/camel/model/routingSlip.json b/core/camel-core-engine/src/generated/resources/org/apache/camel/model/routingSlip.json
index ba01f3b..d0b3fac 100644
--- a/core/camel-core-engine/src/generated/resources/org/apache/camel/model/routingSlip.json
+++ b/core/camel-core-engine/src/generated/resources/org/apache/camel/model/routingSlip.json
@@ -14,7 +14,7 @@
     "expression": { "kind": "expression", "displayName": "Expression", "required": true, "type": "object", "javaType": "org.apache.camel.model.language.ExpressionDefinition", "oneOf": [ "constant", "exchangeProperty", "groovy", "header", "hl7terser", "jsonpath", "language", "method", "mvel", "ognl", "ref", "simple", "spel", "tokenize", "xpath", "xquery", "xtokenize" ], "deprecated": false, "secret": false, "description": "Expression to define the routing slip, which defines which endpoin [...]
     "uriDelimiter": { "kind": "attribute", "displayName": "Uri Delimiter", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "defaultValue": ",", "description": "Sets the uri delimiter to use" },
     "ignoreInvalidEndpoints": { "kind": "attribute", "displayName": "Ignore Invalid Endpoints", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Ignore the invalidate endpoint exception when try to create a producer with that endpoint" },
-    "cacheSize": { "kind": "attribute", "displayName": "Cache Size", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "secret": false, "description": "Sets the maximum size used by the org.apache.camel.spi.ProducerCache which is used to cache and reuse producers when using this routing slip, when uris are reused." },
+    "cacheSize": { "kind": "attribute", "displayName": "Cache Size", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "secret": false, "description": "Sets the maximum size used by the org.apache.camel.spi.ProducerCache which is used to cache and reuse producers when using this routing slip, when uris are reused. Beware that when using dynamic endpoints then it affects how well the cache can be utilized. If each dynamic endpoint is unique then i [...]
     "id": { "kind": "attribute", "displayName": "Id", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "Sets the id of this node" },
     "description": { "kind": "element", "displayName": "Description", "required": false, "type": "object", "javaType": "org.apache.camel.model.DescriptionDefinition", "deprecated": false, "secret": false, "description": "Sets the description of this node" }
   }
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/model/DynamicRouterDefinition.java b/core/camel-core-engine/src/main/java/org/apache/camel/model/DynamicRouterDefinition.java
index cef0095..620595c 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/model/DynamicRouterDefinition.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/model/DynamicRouterDefinition.java
@@ -148,6 +148,19 @@ public class DynamicRouterDefinition<Type extends ProcessorDefinition<Type>> ext
      * {@link org.apache.camel.spi.ProducerCache} which is used to cache and
      * reuse producers when using this dynamic router, when uris are reused.
      *
+     * Beware that when using dynamic endpoints then it affects how well the cache can be utilized.
+     * If each dynamic endpoint is unique then its best to turn of caching by setting this to -1, which
+     * allows Camel to not cache both the producers and endpoints; they are regarded as prototype scoped
+     * and will be stopped and discarded after use. This reduces memory usage as otherwise producers/endpoints
+     * are stored in memory in the caches.
+     *
+     * However if there are a high degree of dynamic endpoints that have been used before, then it can
+     * benefit to use the cache to reuse both producers and endpoints and therefore the cache size
+     * can be set accordingly or rely on the default size (1000).
+     *
+     * If there is a mix of unique and used before dynamic endpoints, then setting a reasonable cache size
+     * can help reduce memory usage to avoid storing too many non frequent used producers.
+     *
      * @param cacheSize the cache size, use <tt>0</tt> for default cache size,
      *            or <tt>-1</tt> to turn cache off.
      * @return the builder
@@ -157,4 +170,31 @@ public class DynamicRouterDefinition<Type extends ProcessorDefinition<Type>> ext
         return this;
     }
 
+    /**
+     * Sets the maximum size used by the
+     * {@link org.apache.camel.spi.ProducerCache} which is used to cache and
+     * reuse producers when using this dynamic router, when uris are reused.
+     *
+     * Beware that when using dynamic endpoints then it affects how well the cache can be utilized.
+     * If each dynamic endpoint is unique then its best to turn of caching by setting this to -1, which
+     * allows Camel to not cache both the producers and endpoints; they are regarded as prototype scoped
+     * and will be stopped and discarded after use. This reduces memory usage as otherwise producers/endpoints
+     * are stored in memory in the caches.
+     *
+     * However if there are a high degree of dynamic endpoints that have been used before, then it can
+     * benefit to use the cache to reuse both producers and endpoints and therefore the cache size
+     * can be set accordingly or rely on the default size (1000).
+     *
+     * If there is a mix of unique and used before dynamic endpoints, then setting a reasonable cache size
+     * can help reduce memory usage to avoid storing too many non frequent used producers.
+     *
+     * @param cacheSize the cache size, use <tt>0</tt> for default cache size,
+     *            or <tt>-1</tt> to turn cache off.
+     * @return the builder
+     */
+    public DynamicRouterDefinition<Type> cacheSize(String cacheSize) {
+        setCacheSize(cacheSize);
+        return this;
+    }
+
 }
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/model/EnrichDefinition.java b/core/camel-core-engine/src/main/java/org/apache/camel/model/EnrichDefinition.java
index 13ad4ad..7210bc2 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/model/EnrichDefinition.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/model/EnrichDefinition.java
@@ -158,6 +158,19 @@ public class EnrichDefinition extends ExpressionNode {
      * {@link org.apache.camel.spi.ProducerCache} which is used to cache and
      * reuse producer when uris are reused.
      *
+     * Beware that when using dynamic endpoints then it affects how well the cache can be utilized.
+     * If each dynamic endpoint is unique then its best to turn of caching by setting this to -1, which
+     * allows Camel to not cache both the producers and endpoints; they are regarded as prototype scoped
+     * and will be stopped and discarded after use. This reduces memory usage as otherwise producers/endpoints
+     * are stored in memory in the caches.
+     *
+     * However if there are a high degree of dynamic endpoints that have been used before, then it can
+     * benefit to use the cache to reuse both producers and endpoints and therefore the cache size
+     * can be set accordingly or rely on the default size (1000).
+     *
+     * If there is a mix of unique and used before dynamic endpoints, then setting a reasonable cache size
+     * can help reduce memory usage to avoid storing too many non frequent used producers.
+     *
      * @param cacheSize the cache size, use <tt>0</tt> for default cache size,
      *            or <tt>-1</tt> to turn cache off.
      * @return the builder
@@ -168,6 +181,33 @@ public class EnrichDefinition extends ExpressionNode {
     }
 
     /**
+     * Sets the maximum size used by the
+     * {@link org.apache.camel.spi.ProducerCache} which is used to cache and
+     * reuse producer when uris are reused.
+     *
+     * Beware that when using dynamic endpoints then it affects how well the cache can be utilized.
+     * If each dynamic endpoint is unique then its best to turn of caching by setting this to -1, which
+     * allows Camel to not cache both the producers and endpoints; they are regarded as prototype scoped
+     * and will be stopped and discarded after use. This reduces memory usage as otherwise producers/endpoints
+     * are stored in memory in the caches.
+     *
+     * However if there are a high degree of dynamic endpoints that have been used before, then it can
+     * benefit to use the cache to reuse both producers and endpoints and therefore the cache size
+     * can be set accordingly or rely on the default size (1000).
+     *
+     * If there is a mix of unique and used before dynamic endpoints, then setting a reasonable cache size
+     * can help reduce memory usage to avoid storing too many non frequent used producers.
+     *
+     * @param cacheSize the cache size, use <tt>0</tt> for default cache size,
+     *            or <tt>-1</tt> to turn cache off.
+     * @return the builder
+     */
+    public EnrichDefinition cacheSize(String cacheSize) {
+        setCacheSize(cacheSize);
+        return this;
+    }
+
+    /**
      * Ignore the invalidate endpoint exception when try to create a producer
      * with that endpoint
      *
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/model/PollEnrichDefinition.java b/core/camel-core-engine/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
index 8fdda8e..ea15e03 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
@@ -172,6 +172,19 @@ public class PollEnrichDefinition extends ExpressionNode {
      * {@link org.apache.camel.spi.ConsumerCache} which is used to cache and
      * reuse consumers when uris are reused.
      *
+     * Beware that when using dynamic endpoints then it affects how well the cache can be utilized.
+     * If each dynamic endpoint is unique then its best to turn of caching by setting this to -1, which
+     * allows Camel to not cache both the producers and endpoints; they are regarded as prototype scoped
+     * and will be stopped and discarded after use. This reduces memory usage as otherwise producers/endpoints
+     * are stored in memory in the caches.
+     *
+     * However if there are a high degree of dynamic endpoints that have been used before, then it can
+     * benefit to use the cache to reuse both producers and endpoints and therefore the cache size
+     * can be set accordingly or rely on the default size (1000).
+     *
+     * If there is a mix of unique and used before dynamic endpoints, then setting a reasonable cache size
+     * can help reduce memory usage to avoid storing too many non frequent used producers.
+     *
      * @param cacheSize the cache size, use <tt>0</tt> for default cache size,
      *            or <tt>-1</tt> to turn cache off.
      * @return the builder
@@ -182,6 +195,33 @@ public class PollEnrichDefinition extends ExpressionNode {
     }
 
     /**
+     * Sets the maximum size used by the
+     * {@link org.apache.camel.spi.ConsumerCache} which is used to cache and
+     * reuse consumers when uris are reused.
+     *
+     * Beware that when using dynamic endpoints then it affects how well the cache can be utilized.
+     * If each dynamic endpoint is unique then its best to turn of caching by setting this to -1, which
+     * allows Camel to not cache both the producers and endpoints; they are regarded as prototype scoped
+     * and will be stopped and discarded after use. This reduces memory usage as otherwise producers/endpoints
+     * are stored in memory in the caches.
+     *
+     * However if there are a high degree of dynamic endpoints that have been used before, then it can
+     * benefit to use the cache to reuse both producers and endpoints and therefore the cache size
+     * can be set accordingly or rely on the default size (1000).
+     *
+     * If there is a mix of unique and used before dynamic endpoints, then setting a reasonable cache size
+     * can help reduce memory usage to avoid storing too many non frequent used producers.
+     *
+     * @param cacheSize the cache size, use <tt>0</tt> for default cache size,
+     *            or <tt>-1</tt> to turn cache off.
+     * @return the builder
+     */
+    public PollEnrichDefinition cacheSize(String cacheSize) {
+        setCacheSize(cacheSize);
+        return this;
+    }
+
+    /**
      * Ignore the invalidate endpoint exception when try to create a producer
      * with that endpoint
      *
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/model/RecipientListDefinition.java b/core/camel-core-engine/src/main/java/org/apache/camel/model/RecipientListDefinition.java
index 098e6a0..a9a4076 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/model/RecipientListDefinition.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/model/RecipientListDefinition.java
@@ -384,6 +384,19 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
      * {@link org.apache.camel.spi.ProducerCache} which is used to cache and
      * reuse producers when using this recipient list, when uris are reused.
      *
+     * Beware that when using dynamic endpoints then it affects how well the cache can be utilized.
+     * If each dynamic endpoint is unique then its best to turn of caching by setting this to -1, which
+     * allows Camel to not cache both the producers and endpoints; they are regarded as prototype scoped
+     * and will be stopped and discarded after use. This reduces memory usage as otherwise producers/endpoints
+     * are stored in memory in the caches.
+     *
+     * However if there are a high degree of dynamic endpoints that have been used before, then it can
+     * benefit to use the cache to reuse both producers and endpoints and therefore the cache size
+     * can be set accordingly or rely on the default size (1000).
+     *
+     * If there is a mix of unique and used before dynamic endpoints, then setting a reasonable cache size
+     * can help reduce memory usage to avoid storing too many non frequent used producers.
+     *
      * @param cacheSize the cache size, use <tt>0</tt> for default cache size,
      *            or <tt>-1</tt> to turn cache off.
      * @return the builder
@@ -393,6 +406,33 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
         return this;
     }
 
+    /**
+     * Sets the maximum size used by the
+     * {@link org.apache.camel.spi.ProducerCache} which is used to cache and
+     * reuse producers when using this recipient list, when uris are reused.
+     *
+     * Beware that when using dynamic endpoints then it affects how well the cache can be utilized.
+     * If each dynamic endpoint is unique then its best to turn of caching by setting this to -1, which
+     * allows Camel to not cache both the producers and endpoints; they are regarded as prototype scoped
+     * and will be stopped and discarded after use. This reduces memory usage as otherwise producers/endpoints
+     * are stored in memory in the caches.
+     *
+     * However if there are a high degree of dynamic endpoints that have been used before, then it can
+     * benefit to use the cache to reuse both producers and endpoints and therefore the cache size
+     * can be set accordingly or rely on the default size (1000).
+     *
+     * If there is a mix of unique and used before dynamic endpoints, then setting a reasonable cache size
+     * can help reduce memory usage to avoid storing too many non frequent used producers.
+     *
+     * @param cacheSize the cache size, use <tt>0</tt> for default cache size,
+     *            or <tt>-1</tt> to turn cache off.
+     * @return the builder
+     */
+    public RecipientListDefinition<Type> cacheSize(String cacheSize) {
+        setCacheSize(cacheSize);
+        return this;
+    }
+
     // Properties
     // -------------------------------------------------------------------------
 
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java b/core/camel-core-engine/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
index 55c149e..5992be8 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
@@ -183,6 +183,19 @@ public class RoutingSlipDefinition<Type extends ProcessorDefinition<Type>> exten
      * {@link org.apache.camel.spi.ProducerCache} which is used to cache and
      * reuse producers when using this routing slip, when uris are reused.
      *
+     * Beware that when using dynamic endpoints then it affects how well the cache can be utilized.
+     * If each dynamic endpoint is unique then its best to turn of caching by setting this to -1, which
+     * allows Camel to not cache both the producers and endpoints; they are regarded as prototype scoped
+     * and will be stopped and discarded after use. This reduces memory usage as otherwise producers/endpoints
+     * are stored in memory in the caches.
+     *
+     * However if there are a high degree of dynamic endpoints that have been used before, then it can
+     * benefit to use the cache to reuse both producers and endpoints and therefore the cache size
+     * can be set accordingly or rely on the default size (1000).
+     *
+     * If there is a mix of unique and used before dynamic endpoints, then setting a reasonable cache size
+     * can help reduce memory usage to avoid storing too many non frequent used producers.
+     *
      * @param cacheSize the cache size, use <tt>0</tt> for default cache size,
      *            or <tt>-1</tt> to turn cache off.
      * @return the builder
@@ -196,6 +209,19 @@ public class RoutingSlipDefinition<Type extends ProcessorDefinition<Type>> exten
      * {@link org.apache.camel.spi.ProducerCache} which is used to cache and
      * reuse producers when using this routing slip, when uris are reused.
      *
+     * Beware that when using dynamic endpoints then it affects how well the cache can be utilized.
+     * If each dynamic endpoint is unique then its best to turn of caching by setting this to -1, which
+     * allows Camel to not cache both the producers and endpoints; they are regarded as prototype scoped
+     * and will be stopped and discarded after use. This reduces memory usage as otherwise producers/endpoints
+     * are stored in memory in the caches.
+     *
+     * However if there are a high degree of dynamic endpoints that have been used before, then it can
+     * benefit to use the cache to reuse both producers and endpoints and therefore the cache size
+     * can be set accordingly or rely on the default size (1000).
+     *
+     * If there is a mix of unique and used before dynamic endpoints, then setting a reasonable cache size
+     * can help reduce memory usage to avoid storing too many non frequent used producers.
+     *
      * @param cacheSize the cache size, use <tt>0</tt> for default cache size,
      *            or <tt>-1</tt> to turn cache off.
      * @return the builder
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/model/WireTapDefinition.java b/core/camel-core-engine/src/main/java/org/apache/camel/model/WireTapDefinition.java
index 9a80a65..6a21ad4 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/model/WireTapDefinition.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/model/WireTapDefinition.java
@@ -282,6 +282,19 @@ public class WireTapDefinition<Type extends ProcessorDefinition<Type>> extends T
      * {@link org.apache.camel.spi.ProducerCache} which is used to cache and
      * reuse producers, when uris are reused.
      *
+     * Beware that when using dynamic endpoints then it affects how well the cache can be utilized.
+     * If each dynamic endpoint is unique then its best to turn of caching by setting this to -1, which
+     * allows Camel to not cache both the producers and endpoints; they are regarded as prototype scoped
+     * and will be stopped and discarded after use. This reduces memory usage as otherwise producers/endpoints
+     * are stored in memory in the caches.
+     *
+     * However if there are a high degree of dynamic endpoints that have been used before, then it can
+     * benefit to use the cache to reuse both producers and endpoints and therefore the cache size
+     * can be set accordingly or rely on the default size (1000).
+     *
+     * If there is a mix of unique and used before dynamic endpoints, then setting a reasonable cache size
+     * can help reduce memory usage to avoid storing too many non frequent used producers.
+     *
      * @param cacheSize the cache size, use <tt>0</tt> for default cache size,
      *            or <tt>-1</tt> to turn cache off.
      * @return the builder
@@ -296,6 +309,19 @@ public class WireTapDefinition<Type extends ProcessorDefinition<Type>> extends T
      * {@link org.apache.camel.spi.ProducerCache} which is used to cache and
      * reuse producers, when uris are reused.
      *
+     * Beware that when using dynamic endpoints then it affects how well the cache can be utilized.
+     * If each dynamic endpoint is unique then its best to turn of caching by setting this to -1, which
+     * allows Camel to not cache both the producers and endpoints; they are regarded as prototype scoped
+     * and will be stopped and discarded after use. This reduces memory usage as otherwise producers/endpoints
+     * are stored in memory in the caches.
+     *
+     * However if there are a high degree of dynamic endpoints that have been used before, then it can
+     * benefit to use the cache to reuse both producers and endpoints and therefore the cache size
+     * can be set accordingly or rely on the default size (1000).
+     *
+     * If there is a mix of unique and used before dynamic endpoints, then setting a reasonable cache size
+     * can help reduce memory usage to avoid storing too many non frequent used producers.
+     *
      * @param cacheSize the cache size, use <tt>0</tt> for default cache size,
      *            or <tt>-1</tt> to turn cache off.
      * @return the builder


[camel] 07/11: CAMEL-14596: Update upgrade guide

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 5d096514aea67d84a3e6bf87445d5d2854a899da
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Feb 21 10:56:03 2020 +0100

    CAMEL-14596: Update upgrade guide
---
 .../modules/ROOT/pages/camel-3x-upgrade-guide.adoc         | 14 ++++++++++++--
 1 file changed, 12 insertions(+), 2 deletions(-)

diff --git a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide.adoc b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide.adoc
index 4900e85..6c9c80a 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide.adoc
@@ -497,7 +497,7 @@ and
             <to uri="mock:a"/>
         </multicast>
     </route>
- ----
+----
 should be rewritten as:
 [source,xml]
 ----
@@ -507,4 +507,14 @@ should be rewritten as:
             <to uri="mock:a"/>
         </multicast>
     </route>
- ----
+----
+
+== Upgrading Camel 3.1 to 3.2
+
+=== EIPs with cacheSize option
+
+The `cacheSize` option on EIPs has been improved to reduce memory usage when the cache is disabled by
+setting the value to -1. One of the optimizations is that new endpoints will not added to the endpoint registry,
+but discarded after use. This avoids storing endpoints in the cache (memory) as the cache was disabled.
+
+See more details in the documentation for the `cacheSize` option on the EIPs.


[camel] 02/11: CAMEL-14596: camel-core - Optimize Recipient List when cacheSize = -1 to avoid creating endpoints. Introduced getPrototypeEndpoint API.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 1859881d17a420384037531a55f6b7623d8c1c16
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Feb 21 04:56:57 2020 +0100

    CAMEL-14596: camel-core - Optimize Recipient List when cacheSize = -1 to avoid creating endpoints. Introduced getPrototypeEndpoint API.
---
 .../org/apache/camel/processor/RecipientList.java  |  1 +
 .../camel/processor/RecipientListProcessor.java    | 61 +++++++++++++++++++---
 .../camel/processor/RecipientListNoCacheTest.java  | 33 ++++++++++++
 3 files changed, 88 insertions(+), 7 deletions(-)

diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java b/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java
index 4903429..f473dfc 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java
@@ -193,6 +193,7 @@ public class RecipientList extends AsyncProcessorSupport implements IdAware, Rou
                 isStopOnAggregateException());
         rlp.setAggregateExecutorService(aggregateExecutorService);
         rlp.setIgnoreInvalidEndpoints(isIgnoreInvalidEndpoints());
+        rlp.setCacheSize(getCacheSize());
         rlp.setId(getId());
         rlp.setRouteId(getRouteId());
 
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/RecipientListProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
index fb5b7c9..3eb6352 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
@@ -32,6 +32,7 @@ import org.apache.camel.ErrorHandlerFactory;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.ExtendedExchange;
+import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.spi.ProducerCache;
@@ -66,6 +67,7 @@ public class RecipientListProcessor extends MulticastProcessor {
     private final Iterator<?> iter;
     private boolean ignoreInvalidEndpoints;
     private ProducerCache producerCache;
+    private int cacheSize;
 
     /**
      * Class that represent each step in the recipient list to do
@@ -82,9 +84,10 @@ public class RecipientListProcessor extends MulticastProcessor {
         private final ProducerCache producerCache;
         private final ExchangePattern pattern;
         private volatile ExchangePattern originalPattern;
+        private final boolean prototypeEndpoint;
 
         private RecipientProcessorExchangePair(int index, ProducerCache producerCache, Endpoint endpoint, Producer producer,
-                                               Processor prepared, Exchange exchange, ExchangePattern pattern) {
+                                               Processor prepared, Exchange exchange, ExchangePattern pattern, boolean prototypeEndpoint) {
             this.index = index;
             this.producerCache = producerCache;
             this.endpoint = endpoint;
@@ -92,6 +95,7 @@ public class RecipientListProcessor extends MulticastProcessor {
             this.prepared = prepared;
             this.exchange = exchange;
             this.pattern = pattern;
+            this.prototypeEndpoint = prototypeEndpoint;
         }
 
         @Override
@@ -139,6 +143,10 @@ public class RecipientListProcessor extends MulticastProcessor {
                 }
                 // when we are done we should release back in pool
                 producerCache.releaseProducer(endpoint, producer);
+                // and stop prototype endpoints
+                if (prototypeEndpoint) {
+                    ServiceHelper.stopAndShutdownService(endpoint);
+                }
             } catch (Exception e) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Error releasing producer: " + producer + ". This exception will be ignored.", e);
@@ -148,6 +156,8 @@ public class RecipientListProcessor extends MulticastProcessor {
 
     }
 
+    // TODO: camel-bean @RecipientList cacheSize
+
     public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<?> iter) {
         super(camelContext, null);
         this.producerCache = producerCache;
@@ -169,6 +179,14 @@ public class RecipientListProcessor extends MulticastProcessor {
         this.iter = iter;
     }
 
+    public int getCacheSize() {
+        return cacheSize;
+    }
+
+    public void setCacheSize(int cacheSize) {
+        this.cacheSize = cacheSize;
+    }
+
     public boolean isIgnoreInvalidEndpoints() {
         return ignoreInvalidEndpoints;
     }
@@ -185,12 +203,21 @@ public class RecipientListProcessor extends MulticastProcessor {
         // at first we must lookup the endpoint and acquire the producer which can send to the endpoint
         int index = 0;
         while (iter.hasNext()) {
+            boolean prototype = cacheSize < 0;
+
             Object recipient = iter.next();
             Endpoint endpoint;
             Producer producer;
             ExchangePattern pattern;
             try {
-                endpoint = resolveEndpoint(exchange, recipient);
+                Endpoint existing = getExistingEndpoint(exchange, recipient);
+                if (existing == null) {
+                    endpoint = resolveEndpoint(exchange, recipient, prototype);
+                } else {
+                    endpoint = existing;
+                    // we have an existing endpoint then its not a prototype scope
+                    prototype = false;
+                }
                 pattern = resolveExchangePattern(recipient);
                 producer = producerCache.acquireProducer(endpoint);
             } catch (Exception e) {
@@ -206,7 +233,7 @@ public class RecipientListProcessor extends MulticastProcessor {
             }
 
             // then create the exchange pair
-            result.add(createProcessorExchangePair(index++, endpoint, producer, exchange, pattern));
+            result.add(createProcessorExchangePair(index++, endpoint, producer, exchange, pattern, prototype));
         }
 
         return result;
@@ -215,7 +242,8 @@ public class RecipientListProcessor extends MulticastProcessor {
     /**
      * This logic is similar to MulticastProcessor but we have to return a RecipientProcessorExchangePair instead
      */
-    protected ProcessorExchangePair createProcessorExchangePair(int index, Endpoint endpoint, Producer producer, Exchange exchange, ExchangePattern pattern) {
+    protected ProcessorExchangePair createProcessorExchangePair(int index, Endpoint endpoint, Producer producer,
+                                                                Exchange exchange, ExchangePattern pattern, boolean prototypeEndpoint) {
         Processor prepared = producer;
 
         // copy exchange, and do not share the unit of work
@@ -243,7 +271,7 @@ public class RecipientListProcessor extends MulticastProcessor {
         }
 
         // and create the pair
-        return new RecipientProcessorExchangePair(index, producerCache, endpoint, producer, prepared, copy, pattern);
+        return new RecipientProcessorExchangePair(index, producerCache, endpoint, producer, prepared, copy, pattern, prototypeEndpoint);
     }
 
     @Override
@@ -263,12 +291,31 @@ public class RecipientListProcessor extends MulticastProcessor {
         return answer;
     }
 
-    protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient) {
+    protected static Endpoint getExistingEndpoint(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException {
+        // trim strings as end users might have added spaces between separators
+        if (recipient instanceof Endpoint) {
+            return (Endpoint) recipient;
+        } else if (recipient instanceof String) {
+            recipient = ((String) recipient).trim();
+        }
+        if (recipient != null) {
+            // convert to a string type we can work with
+            String uri = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient);
+            return exchange.getContext().hasEndpoint(uri);
+        }
+        return null;
+    }
+
+    protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient, boolean prototype) {
         // trim strings as end users might have added spaces between separators
         if (recipient instanceof String) {
             recipient = ((String) recipient).trim();
         }
-        return ExchangeHelper.resolveEndpoint(exchange, recipient);
+        if (recipient != null) {
+            return prototype ? ExchangeHelper.resolvePrototypeEndpoint(exchange, recipient) : ExchangeHelper.resolveEndpoint(exchange, recipient);
+        } else {
+            return null;
+        }
     }
 
     protected ExchangePattern resolveExchangePattern(Object recipient) throws UnsupportedEncodingException, URISyntaxException, MalformedURLException {
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
index f4a9575..f396c42 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
@@ -59,6 +59,39 @@ public class RecipientListNoCacheTest extends ContextTestSupport {
         assertNull(pc);
     }
 
+    @Test
+    public void testNoCacheTwo() throws Exception {
+        assertEquals(1, context.getEndpointRegistry().size());
+
+        sendBody("foo");
+        sendBody("bar");
+
+        // make sure its using an empty producer cache as the cache is disabled
+        List<Processor> list = context.getRoute("route1").filter("foo");
+        RecipientList rl = (RecipientList) list.get(0);
+        assertNotNull(rl);
+        assertEquals(-1, rl.getCacheSize());
+
+        // check no additional endpoints added as cache was disabled
+        assertEquals(1, context.getEndpointRegistry().size());
+
+        // now send again with mocks which then add endpoints
+        MockEndpoint x = getMockEndpoint("mock:x");
+        MockEndpoint y = getMockEndpoint("mock:y");
+        MockEndpoint z = getMockEndpoint("mock:z");
+
+        x.expectedBodiesReceived("foo", "bar");
+        y.expectedBodiesReceived("foo", "bar");
+        z.expectedBodiesReceived("foo", "bar");
+
+        sendBody("foo");
+        sendBody("bar");
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals(4, context.getEndpointRegistry().size());
+    }
+
     protected void sendBody(String body) {
         template.sendBodyAndHeader("direct:a", body, "recipientListHeader", "mock:x,mock:y,mock:z");
     }