You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by gn...@apache.org on 2018/10/12 11:26:37 UTC

[camel] 33/44: Introduce interfaces for ConsumerCache and ProducerCache

This is an automated email from the ASF dual-hosted git repository.

gnodet pushed a commit to branch sandbox/camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 89bf464001137400bdfc810f0a7c7bde04371a35
Author: Guillaume Nodet <gn...@gmail.com>
AuthorDate: Fri Oct 5 10:01:41 2018 +0200

    Introduce interfaces for ConsumerCache and ProducerCache
---
 .../main/java/org/apache/camel/DynamicRouter.java  |   2 +-
 .../main/java/org/apache/camel/RoutingSlip.java    |   2 +-
 .../java/org/apache/camel/spi/ConsumerCache.java   |  55 ++++++++
 .../java/org/apache/camel/spi/ProducerCache.java   |  87 ++++++++++++
 .../src/main/docs/eips/dynamicRouter-eip.adoc      |   2 +-
 camel-core/src/main/docs/eips/enrich-eip.adoc      |   2 +-
 camel-core/src/main/docs/eips/pollEnrich-eip.adoc  |   2 +-
 .../src/main/docs/eips/recipientList-eip.adoc      |   2 +-
 camel-core/src/main/docs/eips/routingSlip-eip.adoc |   2 +-
 camel-core/src/main/docs/eips/toD-eip.adoc         |   2 +-
 camel-core/src/main/docs/eips/wireTap-eip.adoc     |   2 +-
 ...onsumerCache.java => DefaultConsumerCache.java} |   5 +-
 .../apache/camel/impl/DefaultConsumerTemplate.java |   5 +-
 ...roducerCache.java => DefaultProducerCache.java} |  24 +---
 .../apache/camel/impl/DefaultProducerTemplate.java |   5 +-
 .../DefaultManagementLifecycleStrategy.java        |   4 +-
 .../management/mbean/ManagedConsumerCache.java     |   2 +-
 .../management/mbean/ManagedProducerCache.java     |   2 +-
 .../camel/model/DynamicRouterDefinition.java       |   3 +-
 .../org/apache/camel/model/EnrichDefinition.java   |   3 +-
 .../apache/camel/model/PollEnrichDefinition.java   |   3 +-
 .../apache/camel/model/ProcessorDefinition.java    |   3 +-
 .../camel/model/RecipientListDefinition.java       |   2 +-
 .../apache/camel/model/RoutingSlipDefinition.java  |   2 +-
 .../apache/camel/model/ToDynamicDefinition.java    |   2 +-
 .../org/apache/camel/model/WireTapDefinition.java  |   2 +-
 .../java/org/apache/camel/processor/Enricher.java  |   5 +-
 .../org/apache/camel/processor/PollEnricher.java   |   5 +-
 .../org/apache/camel/processor/RecipientList.java  |   5 +-
 .../camel/processor/RecipientListProcessor.java    |   5 +-
 .../org/apache/camel/processor/RoutingSlip.java    | 148 ++++++++++-----------
 .../camel/processor/SendDynamicProcessor.java      |   5 +-
 .../org/apache/camel/processor/SendProcessor.java  |   5 +-
 .../camel/impl/ConsumerCacheZeroCapacityTest.java  |   2 +-
 .../camel/impl/DefaultConsumerCacheTest.java       |   2 +-
 .../camel/impl/DefaultProducerCacheTest.java       |   6 +-
 .../apache/camel/impl/EmptyProducerCacheTest.java  |   4 +-
 .../camel/impl/ProducerCacheNonSingletonTest.java  |   2 +-
 .../management/ManagedConsumerCacheHitsTest.java   |   4 +-
 .../camel/management/ManagedConsumerCacheTest.java |   4 +-
 .../client/GrpcResponseRouterStreamObserver.java   |   5 +-
 41 files changed, 284 insertions(+), 150 deletions(-)

diff --git a/camel-api/src/main/java/org/apache/camel/DynamicRouter.java b/camel-api/src/main/java/org/apache/camel/DynamicRouter.java
index 1e94c4f..278d296 100644
--- a/camel-api/src/main/java/org/apache/camel/DynamicRouter.java
+++ b/camel-api/src/main/java/org/apache/camel/DynamicRouter.java
@@ -60,7 +60,7 @@ public @interface DynamicRouter {
     boolean ignoreInvalidEndpoints() default false;
 
     /**
-     * Sets the maximum size used by the {@link org.apache.camel.impl.ProducerCache} which is used
+     * Sets the maximum size used by the {@link org.apache.camel.spi.ProducerCache} which is used
      * to cache and reuse producers when using this dynamic router, when uris are reused.
      */
     int cacheSize() default 0;
diff --git a/camel-api/src/main/java/org/apache/camel/RoutingSlip.java b/camel-api/src/main/java/org/apache/camel/RoutingSlip.java
index 2c1dca4..bb996aa 100644
--- a/camel-api/src/main/java/org/apache/camel/RoutingSlip.java
+++ b/camel-api/src/main/java/org/apache/camel/RoutingSlip.java
@@ -60,7 +60,7 @@ public @interface RoutingSlip {
     boolean ignoreInvalidEndpoints() default false;
 
     /**
-     * Sets the maximum size used by the {@link org.apache.camel.impl.ProducerCache} which is used
+     * Sets the maximum size used by the {@link org.apache.camel.spi.ProducerCache} which is used
      * to cache and reuse producers when using this routing slip, when uris are reused.
      */
     int cacheSize() default 0;
diff --git a/camel-api/src/main/java/org/apache/camel/spi/ConsumerCache.java b/camel-api/src/main/java/org/apache/camel/spi/ConsumerCache.java
new file mode 100644
index 0000000..67c505a
--- /dev/null
+++ b/camel-api/src/main/java/org/apache/camel/spi/ConsumerCache.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.PollingConsumer;
+import org.apache.camel.Service;
+
+public interface ConsumerCache extends Service {
+
+    PollingConsumer acquirePollingConsumer(Endpoint endpoint);
+
+    void releasePollingConsumer(Endpoint endpoint, PollingConsumer pollingConsumer);
+
+    Exchange receive(Endpoint endpoint);
+
+    Exchange receive(Endpoint endpoint, long timeout);
+
+    Exchange receiveNoWait(Endpoint endpoint);
+
+    Object getSource();
+
+    int size();
+
+    int getCapacity();
+
+    long getHits();
+
+    long getMisses();
+
+    long getEvicted();
+
+    void resetCacheStatistics();
+
+    void purge();
+
+    void cleanUp();
+
+    EndpointUtilizationStatistics getEndpointUtilizationStatistics();
+}
diff --git a/camel-api/src/main/java/org/apache/camel/spi/ProducerCache.java b/camel-api/src/main/java/org/apache/camel/spi/ProducerCache.java
new file mode 100644
index 0000000..51f7c8e
--- /dev/null
+++ b/camel-api/src/main/java/org/apache/camel/spi/ProducerCache.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.AsyncProducer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.PollingConsumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Service;
+
+public interface ProducerCache extends Service {
+
+    AsyncProducer acquireProducer(Endpoint endpoint);
+
+    void releaseProducer(Endpoint endpoint, AsyncProducer producer);
+
+    Exchange send(Endpoint endpoint, Exchange exchange, Processor resultProcessor);
+
+    CompletableFuture<Exchange> asyncSendExchange(Endpoint endpoint, ExchangePattern pattern, Processor processor, Processor resultProcessor, Exchange inExchange, CompletableFuture<Exchange> exchangeFuture);
+
+    Object getSource();
+
+    int size();
+
+    int getCapacity();
+
+    long getHits();
+
+    long getMisses();
+
+    long getEvicted();
+
+    void resetCacheStatistics();
+
+    void purge();
+
+    void cleanUp();
+
+    boolean isEventNotifierEnabled();
+
+    void setEventNotifierEnabled(boolean eventNotifierEnabled);
+
+    EndpointUtilizationStatistics getEndpointUtilizationStatistics();
+
+    boolean doInAsyncProducer(Endpoint endpoint, Exchange exchange, AsyncCallback callback, AsyncProducerCallback asyncProducerCallback);
+
+    /**
+     * Callback for sending a exchange message to a endpoint using an {@link AsyncProcessor} capable producer.
+     * <p/>
+     * Using this callback as a template pattern ensures that Camel handles the resource handling and will
+     * start and stop the given producer, to avoid resource leaks.
+     *
+     */
+    interface AsyncProducerCallback {
+
+        /**
+         * Performs operation on the given producer to send the given exchange.
+         *
+         * @param asyncProducer   the async producer, is never <tt>null</tt>
+         * @param exchange        the exchange to process
+         * @param callback        the async callback
+         * @return (doneSync) <tt>true</tt> to continue execute synchronously, <tt>false</tt> to continue being executed asynchronously
+         */
+        boolean doInAsyncProducer(AsyncProducer asyncProducer, Exchange exchange, AsyncCallback callback);
+    }
+
+}
diff --git a/camel-core/src/main/docs/eips/dynamicRouter-eip.adoc b/camel-core/src/main/docs/eips/dynamicRouter-eip.adoc
index f2e98bb..80ed1d3 100644
--- a/camel-core/src/main/docs/eips/dynamicRouter-eip.adoc
+++ b/camel-core/src/main/docs/eips/dynamicRouter-eip.adoc
@@ -23,7 +23,7 @@ The Dynamic Router EIP supports 3 options which are listed below:
 | Name | Description | Default | Type
 | *uriDelimiter* | Sets the uri delimiter to use | , | String
 | *ignoreInvalidEndpoints* | Ignore the invalidate endpoint exception when try to create a producer with that endpoint | false | Boolean
-| *cacheSize* | Sets the maximum size used by the org.apache.camel.impl.ProducerCache which is used to cache and reuse producers when using this dynamic router, when uris are reused. |  | Integer
+| *cacheSize* | Sets the maximum size used by the org.apache.camel.spi.ProducerCache which is used to cache and reuse producers when using this dynamic router, when uris are reused. |  | Integer
 |===
 // eip options: END
 
diff --git a/camel-core/src/main/docs/eips/enrich-eip.adoc b/camel-core/src/main/docs/eips/enrich-eip.adoc
index 295132f..e2d284b 100644
--- a/camel-core/src/main/docs/eips/enrich-eip.adoc
+++ b/camel-core/src/main/docs/eips/enrich-eip.adoc
@@ -16,7 +16,7 @@ The Enrich EIP supports 7 options which are listed below:
 | *strategyMethodAllowNull* | If this option is false then the aggregate method is not used if there was no data to enrich. If this option is true then null values is used as the oldExchange (when no data to enrich), when using POJOs as the AggregationStrategy. | false | Boolean
 | *aggregateOnException* | If this option is false then the aggregate method is not used if there was an exception thrown while trying to retrieve the data to enrich from the resource. Setting this option to true allows end users to control what to do if there was an exception in the aggregate method. For example to suppress the exception or set a custom message body etc. | false | Boolean
 | *shareUnitOfWork* | Shares the org.apache.camel.spi.UnitOfWork with the parent and the resource exchange. Enrich will by default not share unit of work between the parent exchange and the resource exchange. This means the resource exchange has its own individual unit of work. | false | Boolean
-| *cacheSize* | Sets the maximum size used by the org.apache.camel.impl.ProducerCache which is used to cache and reuse producer when uris are reused. |  | Integer
+| *cacheSize* | Sets the maximum size used by the org.apache.camel.spi.ProducerCache which is used to cache and reuse producer when uris are reused. |  | Integer
 | *ignoreInvalidEndpoint* | Ignore the invalidate endpoint exception when try to create a producer with that endpoint | false | Boolean
 |===
 // eip options: END
diff --git a/camel-core/src/main/docs/eips/pollEnrich-eip.adoc b/camel-core/src/main/docs/eips/pollEnrich-eip.adoc
index 2d699b4..9207655 100644
--- a/camel-core/src/main/docs/eips/pollEnrich-eip.adoc
+++ b/camel-core/src/main/docs/eips/pollEnrich-eip.adoc
@@ -49,7 +49,7 @@ The Poll Enrich EIP supports 7 options which are listed below:
 | *strategyMethodName* | This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy. |  | String
 | *strategyMethodAllowNull* | If this option is false then the aggregate method is not used if there was no data to enrich. If this option is true then null values is used as the oldExchange (when no data to enrich), when using POJOs as the AggregationStrategy. | false | Boolean
 | *aggregateOnException* | If this option is false then the aggregate method is not used if there was an exception thrown while trying to retrieve the data to enrich from the resource. Setting this option to true allows end users to control what to do if there was an exception in the aggregate method. For example to suppress the exception or set a custom message body etc. | false | Boolean
-| *cacheSize* | Sets the maximum size used by the org.apache.camel.impl.ConsumerCache which is used to cache and reuse consumers when uris are reused. |  | Integer
+| *cacheSize* | Sets the maximum size used by the org.apache.camel.spi.ConsumerCache which is used to cache and reuse consumers when uris are reused. |  | Integer
 | *ignoreInvalidEndpoint* | Ignore the invalidate endpoint exception when try to create a producer with that endpoint | false | Boolean
 |===
 // eip options: END
diff --git a/camel-core/src/main/docs/eips/recipientList-eip.adoc b/camel-core/src/main/docs/eips/recipientList-eip.adoc
index c4377d4..7245644 100644
--- a/camel-core/src/main/docs/eips/recipientList-eip.adoc
+++ b/camel-core/src/main/docs/eips/recipientList-eip.adoc
@@ -26,7 +26,7 @@ The Recipient List EIP supports 15 options which are listed below:
 | *timeout* | Sets a total timeout specified in millis, when using parallel processing. If the Recipient List hasn't been able to send and process all replies within the given timeframe, then the timeout triggers and the Recipient List breaks out and continues. Notice if you provide a TimeoutAwareAggregationStrategy then the timeout method is invoked before breaking out. If the timeout is reached with running tasks still remaining, certain tasks for which it is difficult for Camel to shu [...]
 | *onPrepareRef* | Uses the Processor when preparing the org.apache.camel.Exchange to be send. This can be used to deep-clone messages that should be send, or any custom logic needed before the exchange is send. |  | String
 | *shareUnitOfWork* | Shares the org.apache.camel.spi.UnitOfWork with the parent and each of the sub messages. Recipient List will by default not share unit of work between the parent exchange and each recipient exchange. This means each sub exchange has its own individual unit of work. | false | Boolean
-| *cacheSize* | Sets the maximum size used by the org.apache.camel.impl.ProducerCache which is used to cache and reuse producers when using this recipient list, when uris are reused. |  | Integer
+| *cacheSize* | Sets the maximum size used by the org.apache.camel.spi.ProducerCache which is used to cache and reuse producers when using this recipient list, when uris are reused. |  | Integer
 | *parallelAggregate* | If enabled then the aggregate method on AggregationStrategy can be called concurrently. Notice that this would require the implementation of AggregationStrategy to be implemented as thread-safe. By default this is false meaning that Camel synchronizes the call to the aggregate method. Though in some use-cases this can be used to archive higher performance when the AggregationStrategy is implemented as thread-safe. | false | Boolean
 | *stopOnAggregateException* | If enabled, unwind exceptions occurring at aggregation time to the error handler when parallelProcessing is used. Currently, aggregation time exceptions do not stop the route processing when parallelProcessing is used. Enabling this option allows to work around this behavior. The default value is false for the sake of backward compatibility. | false | Boolean
 |===
diff --git a/camel-core/src/main/docs/eips/routingSlip-eip.adoc b/camel-core/src/main/docs/eips/routingSlip-eip.adoc
index 38e37c1..4de7c26 100644
--- a/camel-core/src/main/docs/eips/routingSlip-eip.adoc
+++ b/camel-core/src/main/docs/eips/routingSlip-eip.adoc
@@ -15,7 +15,7 @@ The Routing Slip EIP supports 3 options which are listed below:
 | Name | Description | Default | Type
 | *uriDelimiter* | Sets the uri delimiter to use | , | String
 | *ignoreInvalidEndpoints* | Ignore the invalidate endpoint exception when try to create a producer with that endpoint | false | Boolean
-| *cacheSize* | Sets the maximum size used by the org.apache.camel.impl.ProducerCache which is used to cache and reuse producers when using this routing slip, when uris are reused. |  | Integer
+| *cacheSize* | Sets the maximum size used by the org.apache.camel.spi.ProducerCache which is used to cache and reuse producers when using this routing slip, when uris are reused. |  | Integer
 |===
 // eip options: END
 
diff --git a/camel-core/src/main/docs/eips/toD-eip.adoc b/camel-core/src/main/docs/eips/toD-eip.adoc
index f788864..82a7a5d 100644
--- a/camel-core/src/main/docs/eips/toD-eip.adoc
+++ b/camel-core/src/main/docs/eips/toD-eip.adoc
@@ -17,7 +17,7 @@ The To D EIP supports 5 options which are listed below:
 | Name | Description | Default | Type
 | *uri* | *Required* The uri of the endpoint to send to. The uri can be dynamic computed using the org.apache.camel.language.simple.SimpleLanguage expression. |  | String
 | *pattern* | Sets the optional ExchangePattern used to invoke this endpoint |  | ExchangePattern
-| *cacheSize* | Sets the maximum size used by the org.apache.camel.impl.ConsumerCache which is used to cache and reuse producers. |  | Integer
+| *cacheSize* | Sets the maximum size used by the org.apache.camel.spi.ConsumerCache which is used to cache and reuse producers. |  | Integer
 | *ignoreInvalidEndpoint* | Ignore the invalidate endpoint exception when try to create a producer with that endpoint | false | Boolean
 | *allowOptimisedComponents* | Whether to allow components to optimise toD if they are org.apache.camel.spi.SendDynamicAware. | true | Boolean
 |===
diff --git a/camel-core/src/main/docs/eips/wireTap-eip.adoc b/camel-core/src/main/docs/eips/wireTap-eip.adoc
index b9f3a6f..f8473d4 100644
--- a/camel-core/src/main/docs/eips/wireTap-eip.adoc
+++ b/camel-core/src/main/docs/eips/wireTap-eip.adoc
@@ -31,7 +31,7 @@ The Wire Tap EIP supports 11 options which are listed below:
 | *onPrepareRef* | Uses the Processor when preparing the org.apache.camel.Exchange to be send. This can be used to deep-clone messages that should be send, or any custom logic needed before the exchange is send. |  | String
 | *uri* | *Required* The uri of the endpoint to send to. The uri can be dynamic computed using the org.apache.camel.language.simple.SimpleLanguage expression. |  | String
 | *pattern* | Sets the optional ExchangePattern used to invoke this endpoint |  | ExchangePattern
-| *cacheSize* | Sets the maximum size used by the org.apache.camel.impl.ConsumerCache which is used to cache and reuse producers. |  | Integer
+| *cacheSize* | Sets the maximum size used by the org.apache.camel.spi.ConsumerCache which is used to cache and reuse producers. |  | Integer
 | *ignoreInvalidEndpoint* | Ignore the invalidate endpoint exception when try to create a producer with that endpoint | false | Boolean
 | *allowOptimisedComponents* | Whether to allow components to optimise toD if they are org.apache.camel.spi.SendDynamicAware. | true | Boolean
 |===
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerCache.java
similarity index 97%
rename from camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java
rename to camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerCache.java
index f25f002..6eed897 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerCache.java
@@ -21,6 +21,7 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.FailedToCreateConsumerException;
 import org.apache.camel.PollingConsumer;
+import org.apache.camel.spi.ConsumerCache;
 import org.apache.camel.spi.EndpointUtilizationStatistics;
 import org.apache.camel.support.CamelContextHelper;
 import org.apache.camel.support.ServiceHelper;
@@ -29,7 +30,7 @@ import org.apache.camel.support.ServiceSupport;
 /**
  * Cache containing created {@link org.apache.camel.Consumer}.
  */
-public class ConsumerCache extends ServiceSupport {
+public class DefaultConsumerCache extends ServiceSupport implements ConsumerCache {
 
     private final CamelContext camelContext;
     private final ServicePool<PollingConsumer> consumers;
@@ -39,7 +40,7 @@ public class ConsumerCache extends ServiceSupport {
     private boolean extendedStatistics;
     private int maxCacheSize;
 
-    public ConsumerCache(Object source, CamelContext camelContext, int cacheSize) {
+    public DefaultConsumerCache(Object source, CamelContext camelContext, int cacheSize) {
         this.source = source;
         this.camelContext = camelContext;
         this.maxCacheSize = cacheSize == 0 ? CamelContextHelper.getMaximumCachePoolSize(camelContext) : cacheSize;
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java
index ba52514..257d5c3 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java
@@ -22,6 +22,7 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.ConsumerTemplate;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.spi.ConsumerCache;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.support.CamelContextHelper;
 import org.apache.camel.support.ServiceHelper;
@@ -246,7 +247,7 @@ public class DefaultConsumerTemplate extends ServiceSupport implements ConsumerT
         return answer;
     }
 
-    private ConsumerCache getConsumerCache() {
+    private org.apache.camel.spi.ConsumerCache getConsumerCache() {
         if (!isStarted()) {
             throw new IllegalStateException("ConsumerTemplate has not been started");
         }
@@ -255,7 +256,7 @@ public class DefaultConsumerTemplate extends ServiceSupport implements ConsumerT
 
     protected void doStart() throws Exception {
         if (consumerCache == null) {
-            consumerCache = new ConsumerCache(this, camelContext, maximumCacheSize);
+            consumerCache = new DefaultConsumerCache(this, camelContext, maximumCacheSize);
         }
         ServiceHelper.startService(consumerCache);
     }
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerCache.java
similarity index 94%
rename from camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
rename to camel-core/src/main/java/org/apache/camel/impl/DefaultProducerCache.java
index d713775..5638f9d 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerCache.java
@@ -31,6 +31,7 @@ import org.apache.camel.Producer;
 import org.apache.camel.processor.CamelInternalProcessor;
 import org.apache.camel.processor.SharedCamelInternalProcessor;
 import org.apache.camel.spi.EndpointUtilizationStatistics;
+import org.apache.camel.spi.ProducerCache;
 import org.apache.camel.support.CamelContextHelper;
 import org.apache.camel.support.EventHelper;
 import org.apache.camel.support.ServiceHelper;
@@ -40,7 +41,7 @@ import org.apache.camel.util.StopWatch;
 /**
  * Cache containing created {@link Producer}.
  */
-public class ProducerCache extends ServiceSupport {
+public class DefaultProducerCache extends ServiceSupport implements ProducerCache {
 
     private final CamelContext camelContext;
     private final ServicePool<AsyncProducer> producers;
@@ -52,7 +53,7 @@ public class ProducerCache extends ServiceSupport {
     private boolean extendedStatistics;
     private int maxCacheSize;
 
-    public ProducerCache(Object source, CamelContext camelContext, int cacheSize) {
+    public DefaultProducerCache(Object source, CamelContext camelContext, int cacheSize) {
         this.source = source;
         this.camelContext = camelContext;
         this.maxCacheSize = cacheSize == 0 ? CamelContextHelper.getMaximumCachePoolSize(camelContext) : cacheSize;
@@ -473,23 +474,4 @@ public class ProducerCache extends ServiceSupport {
         return "ProducerCache for source: " + source + ", capacity: " + getCapacity();
     }
 
-    /**
-     * Callback for sending a exchange message to a endpoint using an {@link AsyncProcessor} capable producer.
-     * <p/>
-     * Using this callback as a template pattern ensures that Camel handles the resource handling and will
-     * start and stop the given producer, to avoid resource leaks.
-     *
-         */
-    public interface AsyncProducerCallback {
-
-        /**
-         * Performs operation on the given producer to send the given exchange.
-         *
-         * @param asyncProducer   the async producer, is never <tt>null</tt>
-         * @param exchange        the exchange to process
-         * @param callback        the async callback
-         * @return (doneSync) <tt>true</tt> to continue execute synchronously, <tt>false</tt> to continue being executed asynchronously
-         */
-        boolean doInAsyncProducer(AsyncProducer asyncProducer, Exchange exchange, AsyncCallback callback);
-    }
 }
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
index 9c09cf3..d4dedfe 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
@@ -34,6 +34,7 @@ import org.apache.camel.NoSuchEndpointException;
 import org.apache.camel.Processor;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.processor.ConvertBodyProcessor;
+import org.apache.camel.spi.ProducerCache;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.support.CamelContextHelper;
 import org.apache.camel.support.ExchangeHelper;
@@ -679,7 +680,7 @@ public class DefaultProducerTemplate extends ServiceSupport implements ProducerT
                 });
     }
 
-    private ProducerCache getProducerCache() {
+    private org.apache.camel.spi.ProducerCache getProducerCache() {
         if (!isStarted()) {
             throw new IllegalStateException("ProducerTemplate has not been started");
         }
@@ -713,7 +714,7 @@ public class DefaultProducerTemplate extends ServiceSupport implements ProducerT
 
     protected void doStart() throws Exception {
         if (producerCache == null) {
-            producerCache = new ProducerCache(this, camelContext, maximumCacheSize);
+            producerCache = new DefaultProducerCache(this, camelContext, maximumCacheSize);
             producerCache.setEventNotifierEnabled(isEventNotifierEnabled());
         }
 
diff --git a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
index 1a1a7a3..5806f35 100644
--- a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
@@ -48,11 +48,9 @@ import org.apache.camel.StartupListener;
 import org.apache.camel.TimerListener;
 import org.apache.camel.VetoCamelContextStartException;
 import org.apache.camel.cluster.CamelClusterService;
-import org.apache.camel.impl.ConsumerCache;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.impl.DefaultEndpointRegistry;
 import org.apache.camel.impl.EventDrivenConsumerRoute;
-import org.apache.camel.impl.ProducerCache;
 import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
 import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
 import org.apache.camel.management.mbean.ManagedAsyncProcessorAwaitManager;
@@ -87,6 +85,7 @@ import org.apache.camel.processor.interceptor.BacklogDebugger;
 import org.apache.camel.processor.interceptor.BacklogTracer;
 import org.apache.camel.runtimecatalog.RuntimeCamelCatalog;
 import org.apache.camel.spi.AsyncProcessorAwaitManager;
+import org.apache.camel.spi.ConsumerCache;
 import org.apache.camel.spi.DataFormat;
 import org.apache.camel.spi.EventNotifier;
 import org.apache.camel.spi.InflightRepository;
@@ -95,6 +94,7 @@ import org.apache.camel.spi.ManagementAgent;
 import org.apache.camel.spi.ManagementNameStrategy;
 import org.apache.camel.spi.ManagementObjectStrategy;
 import org.apache.camel.spi.ManagementStrategy;
+import org.apache.camel.spi.ProducerCache;
 import org.apache.camel.spi.RestRegistry;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.spi.RuntimeEndpointRegistry;
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedConsumerCache.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedConsumerCache.java
index 8ccf129..def0b3d 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedConsumerCache.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedConsumerCache.java
@@ -19,7 +19,7 @@ package org.apache.camel.management.mbean;
 import org.apache.camel.CamelContext;
 import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.api.management.mbean.ManagedConsumerCacheMBean;
-import org.apache.camel.impl.ConsumerCache;
+import org.apache.camel.spi.ConsumerCache;
 
 @ManagedResource(description = "Managed ConsumerCache")
 public class ManagedConsumerCache extends ManagedService implements ManagedConsumerCacheMBean {
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedProducerCache.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedProducerCache.java
index e72dc3c..5b2eac5 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedProducerCache.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedProducerCache.java
@@ -19,7 +19,7 @@ package org.apache.camel.management.mbean;
 import org.apache.camel.CamelContext;
 import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.api.management.mbean.ManagedProducerCacheMBean;
-import org.apache.camel.impl.ProducerCache;
+import org.apache.camel.spi.ProducerCache;
 
 @ManagedResource(description = "Managed ProducerCache")
 public class ManagedProducerCache extends ManagedService implements ManagedProducerCacheMBean {
diff --git a/camel-core/src/main/java/org/apache/camel/model/DynamicRouterDefinition.java b/camel-core/src/main/java/org/apache/camel/model/DynamicRouterDefinition.java
index 05a29fa..7f8dda6 100644
--- a/camel-core/src/main/java/org/apache/camel/model/DynamicRouterDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/DynamicRouterDefinition.java
@@ -28,6 +28,7 @@ import org.apache.camel.AsyncProcessor;
 import org.apache.camel.ErrorHandlerFactory;
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
+import org.apache.camel.impl.DefaultProducerCache;
 import org.apache.camel.model.language.ExpressionDefinition;
 import org.apache.camel.processor.DynamicRouter;
 import org.apache.camel.spi.Metadata;
@@ -169,7 +170,7 @@ public class DynamicRouterDefinition<Type extends ProcessorDefinition<Type>> ext
     }
     
     /**
-     * Sets the maximum size used by the {@link org.apache.camel.impl.ProducerCache} which is used
+     * Sets the maximum size used by the {@link DefaultProducerCache} which is used
      * to cache and reuse producers when using this dynamic router, when uris are reused.
      *
      * @param cacheSize  the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off.
diff --git a/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java b/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
index f54b9fd..69ee984 100644
--- a/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
@@ -25,6 +25,7 @@ import javax.xml.bind.annotation.XmlTransient;
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
+import org.apache.camel.impl.DefaultProducerCache;
 import org.apache.camel.model.language.ExpressionDefinition;
 import org.apache.camel.processor.Enricher;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
@@ -187,7 +188,7 @@ public class EnrichDefinition extends NoOutputExpressionNode {
     }
 
     /**
-     * Sets the maximum size used by the {@link org.apache.camel.impl.ProducerCache} which is used
+     * Sets the maximum size used by the {@link DefaultProducerCache} which is used
      * to cache and reuse producer when uris are reused.
      *
      * @param cacheSize  the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off.
diff --git a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
index 667832e..87647d0 100644
--- a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
@@ -25,6 +25,7 @@ import javax.xml.bind.annotation.XmlTransient;
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
+import org.apache.camel.impl.DefaultConsumerCache;
 import org.apache.camel.model.language.ExpressionDefinition;
 import org.apache.camel.processor.PollEnricher;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
@@ -200,7 +201,7 @@ public class PollEnrichDefinition extends NoOutputExpressionNode {
     }
 
     /**
-     * Sets the maximum size used by the {@link org.apache.camel.impl.ConsumerCache} which is used
+     * Sets the maximum size used by the {@link DefaultConsumerCache} which is used
      * to cache and reuse consumers when uris are reused.
      *
      * @param cacheSize  the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off.
diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index 2d6e282..9094494 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -57,7 +57,6 @@ import org.apache.camel.model.dataformat.CustomDataFormat;
 import org.apache.camel.model.language.ConstantExpression;
 import org.apache.camel.model.language.ExpressionDefinition;
 import org.apache.camel.model.language.LanguageExpression;
-import org.apache.camel.model.language.SimpleExpression;
 import org.apache.camel.model.rest.RestDefinition;
 import org.apache.camel.processor.InterceptEndpointProcessor;
 import org.apache.camel.processor.Pipeline;
@@ -635,7 +634,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
      * Sends the exchange to the given dynamic endpoint
      *
      * @param uri  the dynamic endpoint to send to (resolved using simple language by default)
-     * @param cacheSize sets the maximum size used by the {@link org.apache.camel.impl.ConsumerCache} which is used to cache and reuse producers.
+     * @param cacheSize sets the maximum size used by the {@link org.apache.camel.spi.ConsumerCache} which is used to cache and reuse producers.
      *
      * @return the builder
      */
diff --git a/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java b/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
index 9a4e945..48bc007 100644
--- a/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
@@ -456,7 +456,7 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
     }
 
     /**
-     * Sets the maximum size used by the {@link org.apache.camel.impl.ProducerCache} which is used
+     * Sets the maximum size used by the {@link org.apache.camel.spi.ProducerCache} which is used
      * to cache and reuse producers when using this recipient list, when uris are reused.
      *
      * @param cacheSize  the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off.
diff --git a/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java b/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
index d5caaef..b8495ce 100644
--- a/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
@@ -182,7 +182,7 @@ public class RoutingSlipDefinition<Type extends ProcessorDefinition<Type>> exten
     }
 
     /**
-     * Sets the maximum size used by the {@link org.apache.camel.impl.ProducerCache} which is used
+     * Sets the maximum size used by the {@link org.apache.camel.spi.ProducerCache} which is used
      * to cache and reuse producers when using this routing slip, when uris are reused.
      *
      * @param cacheSize  the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off.
diff --git a/camel-core/src/main/java/org/apache/camel/model/ToDynamicDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ToDynamicDefinition.java
index 35065fe..11aea0b 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ToDynamicDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ToDynamicDefinition.java
@@ -151,7 +151,7 @@ public class ToDynamicDefinition extends NoOutputDefinition<ToDynamicDefinition>
     }
 
     /**
-     * Sets the maximum size used by the {@link org.apache.camel.impl.ConsumerCache} which is used to cache and reuse producers.
+     * Sets the maximum size used by the {@link org.apache.camel.spi.ConsumerCache} which is used to cache and reuse producers.
      *
      * @param cacheSize  the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off.
      * @return the builder
diff --git a/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java b/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
index a1aab13..d152cf4 100644
--- a/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
@@ -307,7 +307,7 @@ public class WireTapDefinition<Type extends ProcessorDefinition<Type>> extends T
     }
 
     /**
-     * Sets the maximum size used by the {@link org.apache.camel.impl.ProducerCache} which is used
+     * Sets the maximum size used by the {@link org.apache.camel.spi.ProducerCache} which is used
      * to cache and reuse producers, when uris are reused.
      *
      * @param cacheSize  the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off.
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Enricher.java b/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
index 35b26ac..13688f6 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
@@ -26,10 +26,11 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Expression;
-import org.apache.camel.impl.ProducerCache;
+import org.apache.camel.impl.DefaultProducerCache;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.spi.EndpointUtilizationStatistics;
 import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.ProducerCache;
 import org.apache.camel.support.AsyncProcessorConverterHelper;
 import org.apache.camel.support.AsyncProcessorHelper;
 import org.apache.camel.support.DefaultExchange;
@@ -342,7 +343,7 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, IdAware,
         }
 
         if (producerCache == null) {
-            producerCache = new ProducerCache(this, camelContext, cacheSize);
+            producerCache = new DefaultProducerCache(this, camelContext, cacheSize);
             log.debug("Enricher {} using ProducerCache with cacheSize={}", this, producerCache.getCapacity());
         }
 
diff --git a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
index 08a4a96..28842c6 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
@@ -26,8 +26,9 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.PollingConsumer;
-import org.apache.camel.impl.ConsumerCache;
+import org.apache.camel.impl.DefaultConsumerCache;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.spi.ConsumerCache;
 import org.apache.camel.spi.EndpointUtilizationStatistics;
 import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.spi.IdAware;
@@ -369,7 +370,7 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, IdAw
     protected void doStart() throws Exception {
         if (consumerCache == null) {
             // create consumer cache if we use dynamic expressions for computing the endpoints to poll
-            consumerCache = new ConsumerCache(this, camelContext, cacheSize);
+            consumerCache = new DefaultConsumerCache(this, camelContext, cacheSize);
             log.debug("PollEnrich {} using ConsumerCache with cacheSize={}", this, cacheSize);
         }
         if (aggregationStrategy instanceof CamelContextAware) {
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
index 76630e3..10dcd43 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
@@ -26,11 +26,12 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
-import org.apache.camel.impl.ProducerCache;
+import org.apache.camel.impl.DefaultProducerCache;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
 import org.apache.camel.spi.EndpointUtilizationStatistics;
 import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.ProducerCache;
 import org.apache.camel.support.AsyncProcessorHelper;
 import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.ObjectHelper;
@@ -181,7 +182,7 @@ public class RecipientList extends ServiceSupport implements AsyncProcessor, IdA
 
     protected void doStart() throws Exception {
         if (producerCache == null) {
-            producerCache = new ProducerCache(this, camelContext, cacheSize);
+            producerCache = new DefaultProducerCache(this, camelContext, cacheSize);
             log.debug("RecipientList {} using ProducerCache with cacheSize={}", this, producerCache.getCapacity());
         }
         ServiceHelper.startService(aggregationStrategy, producerCache);
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
index 67fc170..77d4401 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
@@ -31,8 +31,9 @@ import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
-import org.apache.camel.impl.ProducerCache;
+import org.apache.camel.impl.DefaultProducerCache;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.spi.ProducerCache;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.support.AsyncProcessorConverterHelper;
 import org.apache.camel.support.EndpointHelper;
@@ -265,7 +266,7 @@ public class RecipientListProcessor extends MulticastProcessor {
     protected void doStart() throws Exception {
         super.doStart();
         if (producerCache == null) {
-            producerCache = new ProducerCache(this, getCamelContext(), 0);
+            producerCache = new DefaultProducerCache(this, getCamelContext(), 0);
         }
         ServiceHelper.startService(producerCache);
     }
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
index a2decf7..05e9a1e 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
@@ -29,10 +29,10 @@ import org.apache.camel.FailedToCreateProducerException;
 import org.apache.camel.Message;
 import org.apache.camel.Traceable;
 import org.apache.camel.builder.ExpressionBuilder;
-import org.apache.camel.impl.ProducerCache;
-import org.apache.camel.impl.ProducerCache.AsyncProducerCallback;
+import org.apache.camel.impl.DefaultProducerCache;
 import org.apache.camel.spi.EndpointUtilizationStatistics;
 import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.ProducerCache;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.support.AsyncProcessorHelper;
 import org.apache.camel.support.DefaultExchange;
@@ -345,98 +345,96 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
                 originalCallback.done(false);
             }
         };
-        return producerCache.doInAsyncProducer(endpoint, exchange, callback, new AsyncProducerCallback() {
-            public boolean doInAsyncProducer(AsyncProducer asyncProducer, Exchange exchange, final AsyncCallback callback) {
-
-                // rework error handling to support fine grained error handling
-                RouteContext routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null;
-                AsyncProcessor target = createErrorHandler(routeContext, exchange, asyncProducer, endpoint);
-
-                // set property which endpoint we send to and the producer that can do it
-                exchange.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri());
-                exchange.setProperty(Exchange.SLIP_ENDPOINT, endpoint.getEndpointUri());
-                exchange.setProperty(Exchange.SLIP_PRODUCER, asyncProducer);
-
-                return target.process(exchange, new AsyncCallback() {
-                    public void done(boolean doneSync) {
-                        // cleanup producer after usage
-                        exchange.removeProperty(Exchange.SLIP_PRODUCER);
-
-                        // we only have to handle async completion of the routing slip
-                        if (doneSync) {
-                            callback.done(true);
-                            return;
-                        }
-
-                        try {
-                            // continue processing the routing slip asynchronously
-                            Exchange current = prepareExchangeForRoutingSlip(exchange, endpoint);
+        return producerCache.doInAsyncProducer(endpoint, exchange, callback, (p, ex, cb) -> {
+
+            // rework error handling to support fine grained error handling
+            RouteContext routeContext = ex.getUnitOfWork() != null ? ex.getUnitOfWork().getRouteContext() : null;
+            AsyncProcessor target = createErrorHandler(routeContext, ex, p, endpoint);
+
+            // set property which endpoint we send to and the producer that can do it
+            ex.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri());
+            ex.setProperty(Exchange.SLIP_ENDPOINT, endpoint.getEndpointUri());
+            ex.setProperty(Exchange.SLIP_PRODUCER, p);
+
+            return target.process(ex, new AsyncCallback() {
+                public void done(boolean doneSync) {
+                    // cleanup producer after usage
+                    ex.removeProperty(Exchange.SLIP_PRODUCER);
+
+                    // we only have to handle async completion of the routing slip
+                    if (doneSync) {
+                        cb.done(true);
+                        return;
+                    }
 
-                            while (iter.hasNext(current)) {
+                    try {
+                        // continue processing the routing slip asynchronously
+                        Exchange current = prepareExchangeForRoutingSlip(ex, endpoint);
 
-                                // we ignore some kind of exceptions and allow us to continue
-                                if (isIgnoreInvalidEndpoints()) {
-                                    FailedToCreateProducerException e = current.getException(FailedToCreateProducerException.class);
-                                    if (e != null) {
-                                        if (log.isDebugEnabled()) {
-                                            log.debug("Endpoint uri is invalid: " + endpoint + ". This exception will be ignored.", e);
-                                        }
-                                        current.setException(null);
-                                    }
-                                }
-
-                                // Decide whether to continue with the recipients or not; similar logic to the Pipeline
-                                // check for error if so we should break out
-                                if (!continueProcessing(current, "so breaking out of the routing slip", log)) {
-                                    break;
-                                }
+                        while (iter.hasNext(current)) {
 
-                                Endpoint endpoint;
-                                try {
-                                    endpoint = resolveEndpoint(iter, exchange);
-                                    // if no endpoint was resolved then try the next
-                                    if (endpoint == null) {
-                                        continue;
+                            // we ignore some kind of exceptions and allow us to continue
+                            if (isIgnoreInvalidEndpoints()) {
+                                FailedToCreateProducerException e = current.getException(FailedToCreateProducerException.class);
+                                if (e != null) {
+                                    if (log.isDebugEnabled()) {
+                                        log.debug("Endpoint uri is invalid: " + endpoint + ". This exception will be ignored.", e);
                                     }
-                                } catch (Exception e) {
-                                    // error resolving endpoint so we should break out
-                                    exchange.setException(e);
-                                    break;
+                                    current.setException(null);
                                 }
+                            }
 
-                                // prepare and process the routing slip
-                                boolean sync = processExchange(endpoint, current, original, callback, iter);
-                                current = prepareExchangeForRoutingSlip(current, endpoint);
+                            // Decide whether to continue with the recipients or not; similar logic to the Pipeline
+                            // check for error if so we should break out
+                            if (!continueProcessing(current, "so breaking out of the routing slip", log)) {
+                                break;
+                            }
 
-                                if (!sync) {
-                                    log.trace("Processing exchangeId: {} is continued being processed asynchronously", original.getExchangeId());
-                                    return;
+                            Endpoint endpoint1;
+                            try {
+                                endpoint1 = resolveEndpoint(iter, ex);
+                                // if no endpoint was resolved then try the next
+                                if (endpoint1 == null) {
+                                    continue;
                                 }
+                            } catch (Exception e) {
+                                // error resolving endpoint so we should break out
+                                ex.setException(e);
+                                break;
                             }
 
-                            // logging nextExchange as it contains the exchange that might have altered the payload and since
-                            // we are logging the completion if will be confusing if we log the original instead
-                            // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots
-                            log.trace("Processing complete for exchangeId: {} >>> {}", original.getExchangeId(), current);
+                            // prepare and process the routing slip
+                            boolean sync = processExchange(endpoint1, current, original, cb, iter);
+                            current = prepareExchangeForRoutingSlip(current, endpoint1);
 
-                            // copy results back to the original exchange
-                            ExchangeHelper.copyResults(original, current);
-                        } catch (Throwable e) {
-                            exchange.setException(e);
+                            if (!sync) {
+                                log.trace("Processing exchangeId: {} is continued being processed asynchronously", original.getExchangeId());
+                                return;
+                            }
                         }
 
-                        // okay we are completely done with the routing slip
-                        // so we need to signal done on the original callback so it can continue
-                        originalCallback.done(false);
+                        // logging nextExchange as it contains the exchange that might have altered the payload and since
+                        // we are logging the completion if will be confusing if we log the original instead
+                        // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots
+                        log.trace("Processing complete for exchangeId: {} >>> {}", original.getExchangeId(), current);
+
+                        // copy results back to the original exchange
+                        ExchangeHelper.copyResults(original, current);
+                    } catch (Throwable e) {
+                        ex.setException(e);
                     }
-                });
-            }
+
+                    // okay we are completely done with the routing slip
+                    // so we need to signal done on the original callback so it can continue
+                    originalCallback.done(false);
+                }
+            });
         });
     }
 
     protected void doStart() throws Exception {
         if (producerCache == null) {
-            producerCache = new ProducerCache(this, camelContext, cacheSize);
+            producerCache = new DefaultProducerCache(this, camelContext, cacheSize);
             log.debug("RoutingSlip {} using ProducerCache with cacheSize={}", this, producerCache.getCapacity());
         }
 
diff --git a/camel-core/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java
index 3879226..1499e88 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java
@@ -27,9 +27,10 @@ import org.apache.camel.Expression;
 import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.Processor;
 import org.apache.camel.ResolveEndpointFailedException;
-import org.apache.camel.impl.ProducerCache;
+import org.apache.camel.impl.DefaultProducerCache;
 import org.apache.camel.spi.EndpointUtilizationStatistics;
 import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.ProducerCache;
 import org.apache.camel.spi.SendDynamicAware;
 import org.apache.camel.support.AsyncProcessorHelper;
 import org.apache.camel.support.EndpointHelper;
@@ -246,7 +247,7 @@ public class SendDynamicProcessor extends ServiceSupport implements AsyncProcess
 
     protected void doStart() throws Exception {
         if (producerCache == null) {
-            producerCache = new ProducerCache(this, camelContext, cacheSize);
+            producerCache = new DefaultProducerCache(this, camelContext, cacheSize);
             log.debug("DynamicSendTo {} using ProducerCache with cacheSize={}", this, producerCache.getCapacity());
         }
 
diff --git a/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
index c694231..b641ee5 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
@@ -29,8 +29,9 @@ import org.apache.camel.ExchangePattern;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.Traceable;
 import org.apache.camel.impl.InterceptSendToEndpoint;
-import org.apache.camel.impl.ProducerCache;
+import org.apache.camel.impl.DefaultProducerCache;
 import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.ProducerCache;
 import org.apache.camel.support.AsyncProcessorHelper;
 import org.apache.camel.support.EndpointHelper;
 import org.apache.camel.support.EventHelper;
@@ -203,7 +204,7 @@ public class SendProcessor extends ServiceSupport implements AsyncProcessor, Tra
             // and use a regular HashMap as we do not want a soft reference store that may get re-claimed when low on memory
             // as we want to ensure the producer is kept around, to ensure its lifecycle is fully managed,
             // eg stopping the producer when we stop etc.
-            producerCache = new ProducerCache(this, camelContext, 1);
+            producerCache = new DefaultProducerCache(this, camelContext, 1);
             // do not add as service as we do not want to manage the producer cache
         }
         ServiceHelper.startService(producerCache);
diff --git a/camel-core/src/test/java/org/apache/camel/impl/ConsumerCacheZeroCapacityTest.java b/camel-core/src/test/java/org/apache/camel/impl/ConsumerCacheZeroCapacityTest.java
index a4f8a3f..8fccba7 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/ConsumerCacheZeroCapacityTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/ConsumerCacheZeroCapacityTest.java
@@ -30,7 +30,7 @@ public class ConsumerCacheZeroCapacityTest extends ContextTestSupport {
 
     @Test
     public void testConsumerCacheZeroCapacity() throws Exception {
-        ConsumerCache cache = new ConsumerCache(this, context, -1);
+        DefaultConsumerCache cache = new DefaultConsumerCache(this, context, -1);
         cache.start();
 
         assertEquals("Size should be 0", 0, cache.size());
diff --git a/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerCacheTest.java b/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerCacheTest.java
index b1a792d..7e199fb 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerCacheTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerCacheTest.java
@@ -25,7 +25,7 @@ public class DefaultConsumerCacheTest extends ContextTestSupport {
 
     @Test
     public void testCacheConsumers() throws Exception {
-        ConsumerCache cache = new ConsumerCache(this, context, 0);
+        DefaultConsumerCache cache = new DefaultConsumerCache(this, context, 0);
         cache.start();
 
         assertEquals("Size should be 0", 0, cache.size());
diff --git a/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerCacheTest.java b/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerCacheTest.java
index c7c42d2..c22d7ac 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerCacheTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerCacheTest.java
@@ -42,7 +42,7 @@ public class DefaultProducerCacheTest extends ContextTestSupport {
 
     @Test
     public void testCacheProducerAcquireAndRelease() throws Exception {
-        ProducerCache cache = new ProducerCache(this, context, 0);
+        DefaultProducerCache cache = new DefaultProducerCache(this, context, 0);
         cache.start();
 
         assertEquals("Size should be 0", 0, cache.size());
@@ -64,7 +64,7 @@ public class DefaultProducerCacheTest extends ContextTestSupport {
 
     @Test
     public void testCacheStopExpired() throws Exception {
-        ProducerCache cache = new ProducerCache(this, context, 5);
+        DefaultProducerCache cache = new DefaultProducerCache(this, context, 5);
         cache.start();
 
         assertEquals("Size should be 0", 0, cache.size());
@@ -96,7 +96,7 @@ public class DefaultProducerCacheTest extends ContextTestSupport {
 
     @Test
     public void testExtendedStatistics() throws Exception {
-        ProducerCache cache = new ProducerCache(this, context, 5);
+        DefaultProducerCache cache = new DefaultProducerCache(this, context, 5);
         cache.setExtendedStatistics(true);
         cache.start();
 
diff --git a/camel-core/src/test/java/org/apache/camel/impl/EmptyProducerCacheTest.java b/camel-core/src/test/java/org/apache/camel/impl/EmptyProducerCacheTest.java
index de233cd..f1fc61e 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/EmptyProducerCacheTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/EmptyProducerCacheTest.java
@@ -25,7 +25,7 @@ public class EmptyProducerCacheTest extends ContextTestSupport {
 
     @Test
     public void testEmptyCache() throws Exception {
-        ProducerCache cache = new ProducerCache(this, context, -1);
+        DefaultProducerCache cache = new DefaultProducerCache(this, context, -1);
         cache.start();
 
         assertEquals("Size should be 0", 0, cache.size());
@@ -45,7 +45,7 @@ public class EmptyProducerCacheTest extends ContextTestSupport {
 
     @Test
     public void testCacheProducerAcquireAndRelease() throws Exception {
-        ProducerCache cache = new ProducerCache(this, context, -1);
+        DefaultProducerCache cache = new DefaultProducerCache(this, context, -1);
         cache.start();
 
         assertEquals("Size should be 0", 0, cache.size());
diff --git a/camel-core/src/test/java/org/apache/camel/impl/ProducerCacheNonSingletonTest.java b/camel-core/src/test/java/org/apache/camel/impl/ProducerCacheNonSingletonTest.java
index 41d9e7b..0aba8a6 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/ProducerCacheNonSingletonTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/ProducerCacheNonSingletonTest.java
@@ -41,7 +41,7 @@ public class ProducerCacheNonSingletonTest extends ContextTestSupport {
     public void testNonSingleton() throws Exception {
         context.addComponent("dummy", new MyDummyComponent());
 
-        ProducerCache cache = new ProducerCache(this, context, -1);
+        DefaultProducerCache cache = new DefaultProducerCache(this, context, -1);
         cache.start();
 
         Endpoint endpoint = context.getEndpoint("dummy:foo");
diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedConsumerCacheHitsTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedConsumerCacheHitsTest.java
index cb34c1c..50dd9f8 100644
--- a/camel-core/src/test/java/org/apache/camel/management/ManagedConsumerCacheHitsTest.java
+++ b/camel-core/src/test/java/org/apache/camel/management/ManagedConsumerCacheHitsTest.java
@@ -25,7 +25,7 @@ import javax.management.ObjectName;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.impl.ConsumerCache;
+import org.apache.camel.impl.DefaultConsumerCache;
 import org.junit.Test;
 
 public class ManagedConsumerCacheHitsTest extends ManagementTestSupport {
@@ -40,7 +40,7 @@ public class ManagedConsumerCacheHitsTest extends ManagementTestSupport {
         // always register services in JMX so we can enlist our consumer template/cache
         context.getManagementStrategy().getManagementAgent().setRegisterAlways(true);
 
-        ConsumerCache cache = new ConsumerCache(this, context, 0);
+        DefaultConsumerCache cache = new DefaultConsumerCache(this, context, 0);
         context.addService(cache);
 
         template.sendBody("seda:a", "Hello World");
diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedConsumerCacheTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedConsumerCacheTest.java
index 2bc8ed5..d3349af 100644
--- a/camel-core/src/test/java/org/apache/camel/management/ManagedConsumerCacheTest.java
+++ b/camel-core/src/test/java/org/apache/camel/management/ManagedConsumerCacheTest.java
@@ -27,7 +27,7 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.PollingConsumer;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.impl.ConsumerCache;
+import org.apache.camel.impl.DefaultConsumerCache;
 import org.junit.Test;
 
 public class ManagedConsumerCacheTest extends ManagementTestSupport {
@@ -42,7 +42,7 @@ public class ManagedConsumerCacheTest extends ManagementTestSupport {
         // always register services in JMX so we can enlist our consumer template/cache
         context.getManagementStrategy().getManagementAgent().setRegisterAlways(true);
 
-        ConsumerCache cache = new ConsumerCache(this, context, 0);
+        DefaultConsumerCache cache = new DefaultConsumerCache(this, context, 0);
         context.addService(cache);
 
         template.sendBody("direct:start", "Hello World");
diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/client/GrpcResponseRouterStreamObserver.java b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/client/GrpcResponseRouterStreamObserver.java
index 74d57fa..c485ea9 100644
--- a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/client/GrpcResponseRouterStreamObserver.java
+++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/client/GrpcResponseRouterStreamObserver.java
@@ -22,7 +22,8 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.grpc.GrpcConfiguration;
 import org.apache.camel.component.grpc.GrpcConstants;
-import org.apache.camel.impl.ProducerCache;
+import org.apache.camel.impl.DefaultProducerCache;
+import org.apache.camel.spi.ProducerCache;
 import org.apache.camel.support.CamelContextHelper;
 
 /**
@@ -40,7 +41,7 @@ public class GrpcResponseRouterStreamObserver implements StreamObserver<Object>
         this.sourceEndpoint = sourceEndpoint;
         this.endpoint = CamelContextHelper.getMandatoryEndpoint(sourceEndpoint.getCamelContext(), configuration.getStreamRepliesTo());
         sourceEndpoint.getCamelContext().createProducerTemplate(-1);
-        this.producerCache = new ProducerCache(this, sourceEndpoint.getCamelContext(), -1);
+        this.producerCache = new DefaultProducerCache(this, sourceEndpoint.getCamelContext(), -1);
     }
 
     @Override