You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/04/10 11:26:32 UTC

[camel] branch master updated (dbe99d2 -> 1e25f3a)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from dbe99d2  Regen for commit 3f0c9afa0ee30e62f1aea8a6dd08412b8aadbb9b
     new 98fceb9  CAMEL-16458: Enricher EIP - Use SendDynamicProcessor to call endpoint instead of own code.
     new 1e25f3a  CAMEL-16458: Enricher EIP - Use SendDynamicProcessor to call endpoint instead of own code.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/camel/catalog/models/enrich.json    |   1 +
 .../resources/org/apache/camel/model/enrich.json   |   1 +
 .../org/apache/camel/model/EnrichDefinition.java   |  31 +++
 .../apache/camel/model/ToDynamicDefinition.java    |  11 +-
 .../java/org/apache/camel/processor/Enricher.java  | 233 +++------------------
 .../org/apache/camel/reifier/EnrichReifier.java    |   3 +
 .../java/org/apache/camel/xml/in/ModelParser.java  |   1 +
 .../dsl/yaml/deserializers/ModelDeserializers.java |   6 +
 .../src/generated/resources/camel-yaml-dsl.json    |   3 +
 9 files changed, 71 insertions(+), 219 deletions(-)

[camel] 02/02: CAMEL-16458: Enricher EIP - Use SendDynamicProcessor to call endpoint instead of own code.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 1e25f3a5b183eb64962b791e0336c99b8fc24bfd
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sat Apr 10 10:59:46 2021 +0200

    CAMEL-16458: Enricher EIP - Use SendDynamicProcessor to call endpoint instead of own code.
---
 .../org/apache/camel/catalog/models/enrich.json    |  1 +
 .../resources/org/apache/camel/model/enrich.json   |  1 +
 .../org/apache/camel/model/EnrichDefinition.java   | 31 ++++++++++++++++++++++
 .../apache/camel/model/ToDynamicDefinition.java    | 11 +-------
 .../java/org/apache/camel/processor/Enricher.java  | 10 +++++++
 .../org/apache/camel/reifier/EnrichReifier.java    |  3 +++
 .../java/org/apache/camel/xml/in/ModelParser.java  |  1 +
 .../dsl/yaml/deserializers/ModelDeserializers.java |  6 +++++
 .../src/generated/resources/camel-yaml-dsl.json    |  3 +++
 9 files changed, 57 insertions(+), 10 deletions(-)

diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/enrich.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/enrich.json
index 33c2950..c92a660c 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/enrich.json
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/enrich.json
@@ -19,6 +19,7 @@
     "shareUnitOfWork": { "kind": "attribute", "displayName": "Share Unit Of Work", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Shares the org.apache.camel.spi.UnitOfWork with the parent and the resource exchange. Enrich will by default not share unit of work between the parent exchange and the resource exchange. This means the resource exchange has its own individua [...]
     "cacheSize": { "kind": "attribute", "displayName": "Cache Size", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the maximum size used by the org.apache.camel.spi.ProducerCache which is used to cache and reuse producer when uris are reused. Beware that when using dynamic endpoints then it affects how well the cache can be utilized. If each dynamic endpoint is unique then its best to  [...]
     "ignoreInvalidEndpoint": { "kind": "attribute", "displayName": "Ignore Invalid Endpoint", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Ignore the invalidate endpoint exception when try to create a producer with that endpoint" },
+    "allowOptimisedComponents": { "kind": "attribute", "displayName": "Allow Optimised Components", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether to allow components to optimise enricher if they are org.apache.camel.spi.SendDynamicAware ." },
     "id": { "kind": "attribute", "displayName": "Id", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the id of this node" },
     "description": { "kind": "element", "displayName": "Description", "required": false, "type": "object", "javaType": "org.apache.camel.model.DescriptionDefinition", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the description of this node" }
   }
diff --git a/core/camel-core-model/src/generated/resources/org/apache/camel/model/enrich.json b/core/camel-core-model/src/generated/resources/org/apache/camel/model/enrich.json
index 33c2950..c92a660c 100644
--- a/core/camel-core-model/src/generated/resources/org/apache/camel/model/enrich.json
+++ b/core/camel-core-model/src/generated/resources/org/apache/camel/model/enrich.json
@@ -19,6 +19,7 @@
     "shareUnitOfWork": { "kind": "attribute", "displayName": "Share Unit Of Work", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Shares the org.apache.camel.spi.UnitOfWork with the parent and the resource exchange. Enrich will by default not share unit of work between the parent exchange and the resource exchange. This means the resource exchange has its own individua [...]
     "cacheSize": { "kind": "attribute", "displayName": "Cache Size", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the maximum size used by the org.apache.camel.spi.ProducerCache which is used to cache and reuse producer when uris are reused. Beware that when using dynamic endpoints then it affects how well the cache can be utilized. If each dynamic endpoint is unique then its best to  [...]
     "ignoreInvalidEndpoint": { "kind": "attribute", "displayName": "Ignore Invalid Endpoint", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Ignore the invalidate endpoint exception when try to create a producer with that endpoint" },
+    "allowOptimisedComponents": { "kind": "attribute", "displayName": "Allow Optimised Components", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether to allow components to optimise enricher if they are org.apache.camel.spi.SendDynamicAware ." },
     "id": { "kind": "attribute", "displayName": "Id", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the id of this node" },
     "description": { "kind": "element", "displayName": "Description", "required": false, "type": "object", "javaType": "org.apache.camel.model.DescriptionDefinition", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the description of this node" }
   }
diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/EnrichDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/EnrichDefinition.java
index 670519f..94fd2da 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/model/EnrichDefinition.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/model/EnrichDefinition.java
@@ -57,6 +57,9 @@ public class EnrichDefinition extends ExpressionNode {
     @XmlAttribute
     @Metadata(javaType = "java.lang.Boolean")
     private String ignoreInvalidEndpoint;
+    @XmlAttribute
+    @Metadata(label = "advanced", defaultValue = "true", javaType = "java.lang.Boolean")
+    private String allowOptimisedComponents;
 
     public EnrichDefinition() {
         this(null);
@@ -208,6 +211,25 @@ public class EnrichDefinition extends ExpressionNode {
         return this;
     }
 
+    /**
+     * Whether to allow components to optimise enricher if they are {@link org.apache.camel.spi.SendDynamicAware}.
+     *
+     * @return the builder
+     */
+    public EnrichDefinition allowOptimisedComponents(boolean allowOptimisedComponents) {
+        return allowOptimisedComponents(Boolean.toString(allowOptimisedComponents));
+    }
+
+    /**
+     * Whether to allow components to optimise enricher if they are {@link org.apache.camel.spi.SendDynamicAware}.
+     *
+     * @return the builder
+     */
+    public EnrichDefinition allowOptimisedComponents(String allowOptimisedComponents) {
+        setAllowOptimisedComponents(allowOptimisedComponents);
+        return this;
+    }
+
     // Properties
     // -------------------------------------------------------------------------
 
@@ -283,4 +305,13 @@ public class EnrichDefinition extends ExpressionNode {
     public void setIgnoreInvalidEndpoint(String ignoreInvalidEndpoint) {
         this.ignoreInvalidEndpoint = ignoreInvalidEndpoint;
     }
+
+    public String getAllowOptimisedComponents() {
+        return allowOptimisedComponents;
+    }
+
+    public void setAllowOptimisedComponents(String allowOptimisedComponents) {
+        this.allowOptimisedComponents = allowOptimisedComponents;
+    }
+
 }
diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/ToDynamicDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/ToDynamicDefinition.java
index b66de78..86e25ae 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/model/ToDynamicDefinition.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/model/ToDynamicDefinition.java
@@ -56,7 +56,7 @@ public class ToDynamicDefinition extends NoOutputDefinition<ToDynamicDefinition>
     @Metadata(javaType = "java.lang.Boolean")
     private String ignoreInvalidEndpoint;
     @XmlAttribute
-    @Metadata(defaultValue = "true", javaType = "java.lang.Boolean")
+    @Metadata(label = "advanced", defaultValue = "true", javaType = "java.lang.Boolean")
     private String allowOptimisedComponents;
     @XmlAttribute
     @Metadata(label = "advanced", defaultValue = "true", javaType = "java.lang.Boolean")
@@ -192,15 +192,6 @@ public class ToDynamicDefinition extends NoOutputDefinition<ToDynamicDefinition>
      *
      * @return the builder
      */
-    public ToDynamicDefinition allowOptimisedComponents() {
-        return allowOptimisedComponents(true);
-    }
-
-    /**
-     * Whether to allow components to optimise toD if they are {@link org.apache.camel.spi.SendDynamicAware}.
-     *
-     * @return the builder
-     */
     public ToDynamicDefinition allowOptimisedComponents(boolean allowOptimisedComponents) {
         return allowOptimisedComponents(Boolean.toString(allowOptimisedComponents));
     }
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java
index c465c0c..6a0c41f 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java
@@ -64,6 +64,7 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA
     private boolean shareUnitOfWork;
     private int cacheSize;
     private boolean ignoreInvalidEndpoint;
+    protected boolean allowOptimisedComponents = true;
     private ProcessorExchangeFactory processorExchangeFactory;
     private SendDynamicProcessor sendDynamicProcessor;
 
@@ -149,6 +150,14 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA
         this.ignoreInvalidEndpoint = ignoreInvalidEndpoint;
     }
 
+    public boolean isAllowOptimisedComponents() {
+        return allowOptimisedComponents;
+    }
+
+    public void setAllowOptimisedComponents(boolean allowOptimisedComponents) {
+        this.allowOptimisedComponents = allowOptimisedComponents;
+    }
+
     @Override
     public boolean process(final Exchange exchange, final AsyncCallback callback) {
         final Exchange resourceExchange = createResourceExchange(exchange, ExchangePattern.InOut);
@@ -232,6 +241,7 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA
         this.sendDynamicProcessor.setCamelContext(camelContext);
         this.sendDynamicProcessor.setCacheSize(cacheSize);
         this.sendDynamicProcessor.setIgnoreInvalidEndpoint(ignoreInvalidEndpoint);
+        this.sendDynamicProcessor.setAllowOptimisedComponents(allowOptimisedComponents);
 
         // create a per processor exchange factory
         this.processorExchangeFactory = getCamelContext().adapt(ExtendedCamelContext.class)
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/EnrichReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/EnrichReifier.java
index ffe0387..f67d4b4 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/EnrichReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/EnrichReifier.java
@@ -52,6 +52,9 @@ public class EnrichReifier extends ExpressionReifier<EnrichDefinition> {
         if (definition.getAggregateOnException() != null) {
             enricher.setAggregateOnException(parseBoolean(definition.getAggregateOnException(), false));
         }
+        if (definition.getAllowOptimisedComponents() != null) {
+            enricher.setAllowOptimisedComponents(parseBoolean(definition.getAllowOptimisedComponents(), true));
+        }
 
         return enricher;
     }
diff --git a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java
index d5e9be0..c8c9b03 100644
--- a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java
+++ b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java
@@ -351,6 +351,7 @@ public class ModelParser extends BaseParser {
         return doParse(new EnrichDefinition(), (def, key, val) -> {
             switch (key) {
                 case "aggregateOnException": def.setAggregateOnException(val); break;
+                case "allowOptimisedComponents": def.setAllowOptimisedComponents(val); break;
                 case "cacheSize": def.setCacheSize(val); break;
                 case "ignoreInvalidEndpoint": def.setIgnoreInvalidEndpoint(val); break;
                 case "shareUnitOfWork": def.setShareUnitOfWork(val); break;
diff --git a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
index 7080892..4826e21 100644
--- a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
+++ b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
@@ -3978,6 +3978,7 @@ public final class ModelDeserializers extends YamlDeserializerSupport {
                     @YamlProperty(name = "strategy-method-allow-null", type = "string"),
                     @YamlProperty(name = "strategy-method-name", type = "string"),
                     @YamlProperty(name = "strategy-ref", type = "string"),
+                    @YamlProperty(name = "allow-optimised-components", type = "boolean"),
                     @YamlProperty(name = "cache-size", type = "number"),
                     @YamlProperty(name = "expression", type = "object:org.apache.camel.model.language.ExpressionDefinition"),
                     @YamlProperty(name = "ignore-invalid-endpoint", type = "boolean"),
@@ -4020,6 +4021,11 @@ public final class ModelDeserializers extends YamlDeserializerSupport {
                     target.setAggregationStrategyRef(val);
                     break;
                 }
+                case "allow-optimised-components": {
+                    String val = asText(node);
+                    target.setAllowOptimisedComponents(val);
+                    break;
+                }
                 case "cache-size": {
                     String val = asText(node);
                     target.setCacheSize(val);
diff --git a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camel-yaml-dsl.json b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camel-yaml-dsl.json
index 5fe6593..821b910 100644
--- a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camel-yaml-dsl.json
+++ b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camel-yaml-dsl.json
@@ -562,6 +562,9 @@
           "aggregate-on-exception" : {
             "type" : "boolean"
           },
+          "allow-optimised-components" : {
+            "type" : "boolean"
+          },
           "cache-size" : {
             "type" : "number"
           },

[camel] 01/02: CAMEL-16458: Enricher EIP - Use SendDynamicProcessor to call endpoint instead of own code.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 98fceb97004c115dcfbccee339c2c4742f3d1dd6
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sat Apr 10 10:53:08 2021 +0200

    CAMEL-16458: Enricher EIP - Use SendDynamicProcessor to call endpoint instead of own code.
---
 .../java/org/apache/camel/processor/Enricher.java  | 223 ++-------------------
 1 file changed, 14 insertions(+), 209 deletions(-)

diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java
index 847fe72..c465c0c 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java
@@ -18,35 +18,24 @@ package org.apache.camel.processor;
 
 import org.apache.camel.AggregationStrategy;
 import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
-import org.apache.camel.AsyncProducer;
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.CamelExchangeException;
-import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.ExchangePropertyKey;
 import org.apache.camel.Expression;
 import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.ExtendedExchange;
-import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.spi.EndpointUtilizationStatistics;
 import org.apache.camel.spi.IdAware;
-import org.apache.camel.spi.NormalizedEndpointUri;
 import org.apache.camel.spi.ProcessorExchangeFactory;
-import org.apache.camel.spi.ProducerCache;
 import org.apache.camel.spi.RouteIdAware;
-import org.apache.camel.support.AsyncProcessorConverterHelper;
 import org.apache.camel.support.AsyncProcessorSupport;
 import org.apache.camel.support.DefaultExchange;
-import org.apache.camel.support.EventHelper;
 import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.MessageHelper;
-import org.apache.camel.support.cache.DefaultProducerCache;
-import org.apache.camel.support.cache.EmptyProducerCache;
 import org.apache.camel.support.service.ServiceHelper;
-import org.apache.camel.util.StopWatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,7 +58,6 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA
     private CamelContext camelContext;
     private String id;
     private String routeId;
-    private ProducerCache producerCache;
     private final Expression expression;
     private AggregationStrategy aggregationStrategy;
     private boolean aggregateOnException;
@@ -77,6 +65,7 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA
     private int cacheSize;
     private boolean ignoreInvalidEndpoint;
     private ProcessorExchangeFactory processorExchangeFactory;
+    private SendDynamicProcessor sendDynamicProcessor;
 
     public Enricher(Expression expression) {
         this.expression = expression;
@@ -117,7 +106,7 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA
     }
 
     public EndpointUtilizationStatistics getEndpointUtilizationStatistics() {
-        return producerCache.getEndpointUtilizationStatistics();
+        return sendDynamicProcessor.getEndpointUtilizationStatistics();
     }
 
     public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
@@ -160,72 +149,12 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA
         this.ignoreInvalidEndpoint = ignoreInvalidEndpoint;
     }
 
-    /**
-     * Enriches the input data (<code>exchange</code>) by first obtaining additional data from an endpoint represented
-     * by an endpoint <code>producer</code> and second by aggregating input data and additional data. Aggregation of
-     * input data and additional data is delegated to an {@link AggregationStrategy} object set at construction time. If
-     * the message exchange with the resource endpoint fails then no aggregation will be done and the failed exchange
-     * content is copied over to the original message exchange.
-     *
-     * @param exchange input data.
-     */
     @Override
     public boolean process(final Exchange exchange, final AsyncCallback callback) {
-        // which producer to use
-        final AsyncProducer producer;
-        final Endpoint endpoint;
-
-        // use dynamic endpoint so calculate the endpoint to use
-        Object recipient = null;
-        boolean prototype = cacheSize < 0;
-        try {
-            recipient = expression.evaluate(exchange, Object.class);
-            recipient = prepareRecipient(exchange, recipient);
-            Endpoint existing = getExistingEndpoint(exchange, recipient);
-            if (existing == null) {
-                endpoint = resolveEndpoint(exchange, recipient, prototype);
-            } else {
-                endpoint = existing;
-                // we have an existing endpoint then its not a prototype scope
-                prototype = false;
-            }
-            // acquire the producer from the cache
-            producer = producerCache.acquireProducer(endpoint);
-        } catch (Throwable e) {
-            if (isIgnoreInvalidEndpoint()) {
-                LOG.debug("Endpoint uri is invalid: {}. This exception will be ignored.", recipient, e);
-            } else {
-                exchange.setException(e);
-            }
-            callback.done(true);
-            return true;
-        }
-
         final Exchange resourceExchange = createResourceExchange(exchange, ExchangePattern.InOut);
-        final Endpoint destination = producer.getEndpoint();
-
-        StopWatch sw = null;
-        boolean sending = EventHelper.notifyExchangeSending(exchange.getContext(), resourceExchange, destination);
-        if (sending) {
-            sw = new StopWatch();
-        }
-        // record timing for sending the exchange using the producer
-        final StopWatch watch = sw;
-        final boolean prototypeEndpoint = prototype;
-        AsyncProcessor ap = AsyncProcessorConverterHelper.convert(producer);
-        boolean sync = ap.process(resourceExchange, new AsyncCallback() {
+        return sendDynamicProcessor.process(resourceExchange, new AsyncCallback() {
+            @Override
             public void done(boolean doneSync) {
-                // we only have to handle async completion
-                if (doneSync) {
-                    return;
-                }
-
-                // emit event that the exchange was sent to the endpoint
-                if (watch != null) {
-                    long timeTaken = watch.taken();
-                    EventHelper.notifyExchangeSent(resourceExchange.getContext(), resourceExchange, destination, timeTaken);
-                }
-
                 if (!isAggregateOnException() && resourceExchange.isFailed()) {
                     // copy resource exchange onto original exchange (preserving pattern)
                     copyResultsPreservePattern(exchange, resourceExchange);
@@ -251,132 +180,12 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA
                     }
                 }
 
-                // set property with the uri of the endpoint enriched so we can use that for tracing etc
-                exchange.setProperty(ExchangePropertyKey.TO_ENDPOINT, producer.getEndpoint().getEndpointUri());
-
-                // return the producer back to the cache
-                try {
-                    producerCache.releaseProducer(endpoint, producer);
-                } catch (Exception e) {
-                    // ignore
-                }
-                // and stop prototype endpoints
-                if (prototypeEndpoint) {
-                    ServiceHelper.stopAndShutdownService(endpoint);
-                }
-
                 // and release resource exchange back in pool
                 processorExchangeFactory.release(resourceExchange);
 
-                callback.done(false);
+                callback.done(doneSync);
             }
         });
-
-        if (!sync) {
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId());
-            }
-            // the remainder of the routing slip will be completed async
-            // so we break out now, then the callback will be invoked which then continue routing from where we left here
-            return false;
-        }
-
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId());
-        }
-
-        if (watch != null) {
-            // emit event that the exchange was sent to the endpoint
-            long timeTaken = watch.taken();
-            EventHelper.notifyExchangeSent(resourceExchange.getContext(), resourceExchange, destination, timeTaken);
-        }
-
-        if (!isAggregateOnException() && resourceExchange.isFailed()) {
-            // copy resource exchange onto original exchange (preserving pattern)
-            copyResultsPreservePattern(exchange, resourceExchange);
-        } else {
-            prepareResult(exchange);
-
-            try {
-                // prepare the exchanges for aggregation
-                ExchangeHelper.prepareAggregation(exchange, resourceExchange);
-                MessageHelper.resetStreamCache(exchange.getIn());
-
-                Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
-                if (aggregatedExchange != null) {
-                    // copy aggregation result onto original exchange (preserving pattern)
-                    copyResultsPreservePattern(exchange, aggregatedExchange);
-                }
-            } catch (Throwable e) {
-                // if the aggregationStrategy threw an exception, set it on the original exchange
-                exchange.setException(new CamelExchangeException("Error occurred during aggregation", exchange, e));
-            }
-        }
-
-        // set property with the uri of the endpoint enriched so we can use that for tracing etc
-        exchange.setProperty(ExchangePropertyKey.TO_ENDPOINT, producer.getEndpoint().getEndpointUri());
-
-        // return the producer back to the cache
-        try {
-            producerCache.releaseProducer(endpoint, producer);
-        } catch (Exception e) {
-            // ignore
-        }
-        // and stop prototype endpoints
-        if (prototypeEndpoint) {
-            ServiceHelper.stopAndShutdownService(endpoint);
-        }
-
-        // and release resource exchange back in pool
-        processorExchangeFactory.release(resourceExchange);
-
-        callback.done(true);
-        return true;
-    }
-
-    protected static Object prepareRecipient(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException {
-        if (recipient instanceof Endpoint || recipient instanceof NormalizedEndpointUri) {
-            return recipient;
-        } else if (recipient instanceof String) {
-            // trim strings as end users might have added spaces between separators
-            recipient = ((String) recipient).trim();
-        }
-        if (recipient != null) {
-            ExtendedCamelContext ecc = (ExtendedCamelContext) exchange.getContext();
-            String uri;
-            if (recipient instanceof String) {
-                uri = (String) recipient;
-            } else {
-                // convert to a string type we can work with
-                uri = ecc.getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient);
-            }
-            // optimize and normalize endpoint
-            return ecc.normalizeUri(uri);
-        }
-        return null;
-    }
-
-    protected static Endpoint getExistingEndpoint(Exchange exchange, Object recipient) {
-        if (recipient instanceof Endpoint) {
-            return (Endpoint) recipient;
-        }
-        if (recipient != null) {
-            if (recipient instanceof NormalizedEndpointUri) {
-                NormalizedEndpointUri nu = (NormalizedEndpointUri) recipient;
-                ExtendedCamelContext ecc = exchange.getContext().adapt(ExtendedCamelContext.class);
-                return ecc.hasEndpoint(nu);
-            } else {
-                String uri = recipient.toString();
-                return exchange.getContext().hasEndpoint(uri);
-            }
-        }
-        return null;
-    }
-
-    protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient, boolean prototype) {
-        return prototype
-                ? ExchangeHelper.resolvePrototypeEndpoint(exchange, recipient)
-                : ExchangeHelper.resolveEndpoint(exchange, recipient);
     }
 
     /**
@@ -418,6 +227,12 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA
 
     @Override
     protected void doBuild() throws Exception {
+        // use send dynamic to send to endpoint
+        this.sendDynamicProcessor = new SendDynamicProcessor(null, getExpression());
+        this.sendDynamicProcessor.setCamelContext(camelContext);
+        this.sendDynamicProcessor.setCacheSize(cacheSize);
+        this.sendDynamicProcessor.setIgnoreInvalidEndpoint(ignoreInvalidEndpoint);
+
         // create a per processor exchange factory
         this.processorExchangeFactory = getCamelContext().adapt(ExtendedCamelContext.class)
                 .getProcessorExchangeFactory().newProcessorExchangeFactory(this);
@@ -430,27 +245,17 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA
         if (aggregationStrategy instanceof CamelContextAware) {
             ((CamelContextAware) aggregationStrategy).setCamelContext(camelContext);
         }
-        ServiceHelper.buildService(processorExchangeFactory);
+        ServiceHelper.buildService(processorExchangeFactory, sendDynamicProcessor);
     }
 
     @Override
     protected void doStart() throws Exception {
-        if (producerCache == null) {
-            if (cacheSize < 0) {
-                producerCache = new EmptyProducerCache(this, camelContext);
-                LOG.debug("Enricher {} is not using ProducerCache", this);
-            } else {
-                producerCache = new DefaultProducerCache(this, camelContext, cacheSize);
-                LOG.debug("Enricher {} using ProducerCache with cacheSize={}", this, cacheSize);
-            }
-        }
-
-        ServiceHelper.startService(processorExchangeFactory, producerCache, aggregationStrategy);
+        ServiceHelper.startService(processorExchangeFactory, aggregationStrategy, sendDynamicProcessor);
     }
 
     @Override
     protected void doStop() throws Exception {
-        ServiceHelper.stopService(aggregationStrategy, producerCache, processorExchangeFactory);
+        ServiceHelper.stopService(aggregationStrategy, processorExchangeFactory, sendDynamicProcessor);
     }
 
     private static class CopyAggregationStrategy implements AggregationStrategy {