You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/03/07 16:55:55 UTC

[camel] branch pool created (now 337b9c5)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch pool
in repository https://gitbox.apache.org/repos/asf/camel.git.


      at 337b9c5  CAMEL-16279: camel-core - Optimize core to reduce object allocations by pooloing reusable tasks in the routing engine.

This branch includes the following new commits:

     new 337b9c5  CAMEL-16279: camel-core - Optimize core to reduce object allocations by pooloing reusable tasks in the routing engine.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[camel] 01/01: CAMEL-16279: camel-core - Optimize core to reduce object allocations by pooloing reusable tasks in the routing engine.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch pool
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 337b9c579a73cfdae52721f2f04f76ea616681a8
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Mar 7 17:54:51 2021 +0100

    CAMEL-16279: camel-core - Optimize core to reduce object allocations by pooloing reusable tasks in the routing engine.
---
 .../java/org/apache/camel/spi/ExchangeFactory.java |  86 +-----------
 .../apache/camel/spi/InternalProcessorFactory.java |   4 +-
 .../org/apache/camel/spi/PooledObjectFactory.java  | 125 +++++++++++++++++
 .../camel/impl/engine/AbstractCamelContext.java    |   3 +
 .../camel/impl/engine/PooledExchangeFactory.java   |   5 +
 .../impl/engine/PrototypeExchangeFactory.java      | 111 ++-------------
 .../apache/camel/processor/PooledExchangeTask.java |  41 ++++++
 .../camel/processor/PooledExchangeTaskFactory.java |  56 ++++++++
 .../apache/camel/processor/PooledTaskFactory.java  |  64 +++++++++
 .../camel/processor/PrototypeTaskFactory.java      |  48 +++++++
 .../errorhandler/RedeliveryErrorHandler.java       | 137 ++++++++++++++-----
 .../camel/support/PooledObjectFactorySupport.java  | 152 +++++++++++++++++++++
 .../support/PrototypeObjectFactorySupport.java     | 139 +++++++++++++++++++
 13 files changed, 756 insertions(+), 215 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
index fcedd54..29b3fdb 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
@@ -16,12 +16,10 @@
  */
 package org.apache.camel.spi;
 
-import org.apache.camel.CamelContextAware;
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.NonManagedService;
-import org.apache.camel.Service;
 
 /**
  * Factory used by {@link Consumer} to create Camel {@link Exchange} holding the incoming message received by the
@@ -36,50 +34,7 @@ import org.apache.camel.Service;
  * The factory is pluggable which allows to use different strategies. The default factory will create a new
  * {@link Exchange} instance, and the pooled factory will pool and reuse exchanges.
  */
-public interface ExchangeFactory extends Service, CamelContextAware, NonManagedService, RouteIdAware {
-
-    /**
-     * Utilization statistics of the this factory.
-     */
-    interface Statistics {
-
-        /**
-         * Number of new exchanges created.
-         */
-        long getCreatedCounter();
-
-        /**
-         * Number of exchanges acquired (reused) when using pooled factory.
-         */
-        long getAcquiredCounter();
-
-        /**
-         * Number of exchanges released back to pool
-         */
-        long getReleasedCounter();
-
-        /**
-         * Number of exchanges discarded (thrown away) such as if no space in cache pool.
-         */
-        long getDiscardedCounter();
-
-        /**
-         * Reset the counters
-         */
-        void reset();
-
-        /**
-         * Whether statistics is enabled.
-         */
-        boolean isStatisticsEnabled();
-
-        /**
-         * Sets whether statistics is enabled.
-         *
-         * @param statisticsEnabled <tt>true</tt> to enable
-         */
-        void setStatisticsEnabled(boolean statisticsEnabled);
-    }
+public interface ExchangeFactory extends PooledObjectFactory<Exchange>, NonManagedService, RouteIdAware {
 
     /**
      * Service factory key.
@@ -125,43 +80,8 @@ public interface ExchangeFactory extends Service, CamelContextAware, NonManagedS
     }
 
     /**
-     * The capacity the pool (for each consumer) uses for storing exchanges. The default capacity is 100.
-     */
-    int getCapacity();
-
-    /**
-     * The current number of exchanges in the pool
-     */
-    int getSize();
-
-    /**
-     * The capacity the pool (for each consumer) uses for storing exchanges. The default capacity is 100.
-     */
-    void setCapacity(int capacity);
-
-    /**
-     * Whether statistics is enabled.
-     */
-    boolean isStatisticsEnabled();
-
-    /**
-     * Whether statistics is enabled.
-     */
-    void setStatisticsEnabled(boolean statisticsEnabled);
-
-    /**
-     * Reset the statistics
-     */
-    void resetStatistics();
-
-    /**
-     * Purges the internal cache (if pooled)
-     */
-    void purge();
-
-    /**
-     * Gets the usage statistics
+     * Whether the factory is pooled.
      */
-    Statistics getStatistics();
+    boolean isPooled();
 
 }
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java
index b3f8868..8a56f21 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java
@@ -27,7 +27,9 @@ import org.apache.camel.Route;
 
 /**
  * A factory used internally by Camel to create {@link Processor} and other internal building blocks. This factory is
- * used to have loose coupling between the modules in core. Camel user user should only use {@link ProcessorFactory}.
+ * used to have loose coupling between the modules in core.
+ *
+ * Camel end user should NOT use this, but use {@link ProcessorFactory} instead.
  *
  * @see ProcessorFactory
  */
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/PooledObjectFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/PooledObjectFactory.java
new file mode 100644
index 0000000..db4c0d1
--- /dev/null
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/PooledObjectFactory.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Service;
+
+/**
+ * Factory for pooled objects or tasks.
+ */
+public interface PooledObjectFactory<T> extends Service, CamelContextAware {
+
+    /**
+     * Utilization statistics of the this factory.
+     */
+    interface Statistics {
+
+        /**
+         * Number of new exchanges created.
+         */
+        long getCreatedCounter();
+
+        /**
+         * Number of exchanges acquired (reused) when using pooled factory.
+         */
+        long getAcquiredCounter();
+
+        /**
+         * Number of exchanges released back to pool
+         */
+        long getReleasedCounter();
+
+        /**
+         * Number of exchanges discarded (thrown away) such as if no space in cache pool.
+         */
+        long getDiscardedCounter();
+
+        /**
+         * Reset the counters
+         */
+        void reset();
+
+        /**
+         * Whether statistics is enabled.
+         */
+        boolean isStatisticsEnabled();
+
+        /**
+         * Sets whether statistics is enabled.
+         *
+         * @param statisticsEnabled <tt>true</tt> to enable
+         */
+        void setStatisticsEnabled(boolean statisticsEnabled);
+    }
+
+    /**
+     * The current number of objects in the pool
+     */
+    int getSize();
+
+    /**
+     * The capacity the pool uses for storing objects. The default capacity is 100.
+     */
+    int getCapacity();
+
+    /**
+     * The capacity the pool uses for storing objects. The default capacity is 100.
+     */
+    void setCapacity(int capacity);
+
+    /**
+     * Whether statistics is enabled.
+     */
+    boolean isStatisticsEnabled();
+
+    /**
+     * Whether statistics is enabled.
+     */
+    void setStatisticsEnabled(boolean statisticsEnabled);
+
+    /**
+     * Reset the statistics
+     */
+    void resetStatistics();
+
+    /**
+     * Purges the internal cache (if pooled)
+     */
+    void purge();
+
+    /**
+     * Gets the usage statistics
+     */
+    Statistics getStatistics();
+
+    /**
+     * Acquires an object from the pool (if any)
+     *
+     * @return the object or <tt>null</tt> if the pool is empty
+     */
+    T acquire();
+
+    /**
+     * Releases the object back to the pool
+     *
+     * @param  t the object
+     * @return   true if released into the pool, or false if something went wrong and the object was discarded
+     */
+    boolean release(T t);
+
+}
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index d7e6dd4..45fcf6d 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -2882,6 +2882,9 @@ public abstract class AbstractCamelContext extends BaseService
         }
         bootstraps.clear();
 
+        if (adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled()) {
+            LOG.info("Pooled mode enabled. Camel pools and reuses objects to reduce JVM object allocations.");
+        }
         if (isLightweight()) {
             LOG.info("Lightweight mode enabled. Performing optimizations and memory reduction.");
             ReifierStrategy.clearReifiers();
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
index 4079a46..b1647b5 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
@@ -168,6 +168,11 @@ public final class PooledExchangeFactory extends PrototypeExchangeFactory {
     }
 
     @Override
+    public boolean isPooled() {
+        return true;
+    }
+
+    @Override
     protected void doStop() throws Exception {
         exchangeFactoryManager.removeExchangeFactory(this);
         logUsageSummary(LOG, "PooledExchangeFactory", pool.size());
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeExchangeFactory.java
index fb17cb0..eef1ea7 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeExchangeFactory.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeExchangeFactory.java
@@ -16,9 +16,6 @@
  */
 package org.apache.camel.impl.engine;
 
-import java.util.concurrent.atomic.LongAdder;
-
-import org.apache.camel.CamelContext;
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
@@ -26,7 +23,7 @@ import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.spi.ExchangeFactory;
 import org.apache.camel.spi.ExchangeFactoryManager;
 import org.apache.camel.support.DefaultExchange;
-import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.support.PooledObjectFactorySupport;
 import org.apache.camel.util.URISupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,13 +31,11 @@ import org.slf4j.LoggerFactory;
 /**
  * {@link ExchangeFactory} that creates a new {@link Exchange} instance.
  */
-public class PrototypeExchangeFactory extends ServiceSupport implements ExchangeFactory {
+public class PrototypeExchangeFactory extends PooledObjectFactorySupport<Exchange> implements ExchangeFactory {
 
     private static final Logger LOG = LoggerFactory.getLogger(PrototypeExchangeFactory.class);
 
-    final UtilizationStatistics statistics = new UtilizationStatistics();
     final Consumer consumer;
-    CamelContext camelContext;
     ExchangeFactoryManager exchangeFactoryManager;
     String routeId;
 
@@ -54,6 +49,7 @@ public class PrototypeExchangeFactory extends ServiceSupport implements Exchange
 
     @Override
     protected void doBuild() throws Exception {
+        super.doBuild();
         this.exchangeFactoryManager = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactoryManager();
     }
 
@@ -73,16 +69,6 @@ public class PrototypeExchangeFactory extends ServiceSupport implements Exchange
     }
 
     @Override
-    public CamelContext getCamelContext() {
-        return camelContext;
-    }
-
-    @Override
-    public void setCamelContext(CamelContext camelContext) {
-        this.camelContext = camelContext;
-    }
-
-    @Override
     public ExchangeFactory newExchangeFactory(Consumer consumer) {
         PrototypeExchangeFactory answer = new PrototypeExchangeFactory(consumer);
         answer.setStatisticsEnabled(statistics.isStatisticsEnabled());
@@ -92,6 +78,11 @@ public class PrototypeExchangeFactory extends ServiceSupport implements Exchange
     }
 
     @Override
+    public Exchange acquire() {
+        throw new UnsupportedOperationException("Not in use");
+    }
+
+    @Override
     public Exchange create(boolean autoRelease) {
         if (statistics.isStatisticsEnabled()) {
             statistics.created.increment();
@@ -116,47 +107,18 @@ public class PrototypeExchangeFactory extends ServiceSupport implements Exchange
     }
 
     @Override
-    public boolean isStatisticsEnabled() {
-        return statistics.isStatisticsEnabled();
-    }
-
-    @Override
-    public void setStatisticsEnabled(boolean statisticsEnabled) {
-        statistics.setStatisticsEnabled(statisticsEnabled);
-    }
-
-    @Override
-    public int getCapacity() {
-        return 0;
-    }
-
-    @Override
-    public int getSize() {
-        return 0;
-    }
-
-    @Override
-    public void setCapacity(int capacity) {
-        // not in use
-    }
-
-    @Override
     public void resetStatistics() {
         statistics.reset();
     }
 
     @Override
-    public void purge() {
-        // not in use
-    }
-
-    @Override
-    public Statistics getStatistics() {
-        return statistics;
+    public boolean isPooled() {
+        return false;
     }
 
     @Override
     protected void doStart() throws Exception {
+        super.doStart();
         if (exchangeFactoryManager != null) {
             exchangeFactoryManager.addExchangeFactory(this);
         }
@@ -164,6 +126,7 @@ public class PrototypeExchangeFactory extends ServiceSupport implements Exchange
 
     @Override
     protected void doStop() throws Exception {
+        super.doStop();
         if (exchangeFactoryManager != null) {
             exchangeFactoryManager.removeExchangeFactory(this);
         }
@@ -190,54 +153,4 @@ public class PrototypeExchangeFactory extends ServiceSupport implements Exchange
         }
     }
 
-    /**
-     * Represents utilization statistics
-     */
-    final class UtilizationStatistics implements ExchangeFactory.Statistics {
-
-        boolean statisticsEnabled;
-        final LongAdder created = new LongAdder();
-        final LongAdder acquired = new LongAdder();
-        final LongAdder released = new LongAdder();
-        final LongAdder discarded = new LongAdder();
-
-        @Override
-        public void reset() {
-            created.reset();
-            acquired.reset();
-            released.reset();
-            discarded.reset();
-        }
-
-        @Override
-        public long getCreatedCounter() {
-            return created.longValue();
-        }
-
-        @Override
-        public long getAcquiredCounter() {
-            return acquired.longValue();
-        }
-
-        @Override
-        public long getReleasedCounter() {
-            return released.longValue();
-        }
-
-        @Override
-        public long getDiscardedCounter() {
-            return discarded.longValue();
-        }
-
-        @Override
-        public boolean isStatisticsEnabled() {
-            return statisticsEnabled;
-        }
-
-        @Override
-        public void setStatisticsEnabled(boolean statisticsEnabled) {
-            this.statisticsEnabled = statisticsEnabled;
-        }
-    }
-
 }
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledExchangeTask.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledExchangeTask.java
new file mode 100644
index 0000000..d4e0226
--- /dev/null
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledExchangeTask.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+
+/**
+ * A task that EIPs and internal routing engine uses to store state when processing an {@link Exchange}.
+ *
+ * @see org.apache.camel.processor.PooledExchangeTaskFactory
+ */
+public interface PooledExchangeTask extends Runnable {
+
+    /**
+     * Prepares the task for the given exchange and its callback
+     *
+     * @param exchange the exchange
+     * @param callback the callback
+     */
+    void prepare(Exchange exchange, AsyncCallback callback);
+
+    /**
+     * Resets the task after its done and can be reused for another exchange.
+     */
+    void reset();
+}
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledExchangeTaskFactory.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledExchangeTaskFactory.java
new file mode 100644
index 0000000..b90e9f5
--- /dev/null
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledExchangeTaskFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.spi.PooledObjectFactory;
+
+/**
+ * Factory to create {@link PooledExchangeTask}.
+ *
+ * @see PooledExchangeTask
+ */
+public interface PooledExchangeTaskFactory extends PooledObjectFactory<PooledExchangeTask> {
+
+    /**
+     * Creates a new task to use for processing the exchange.
+     *
+     * @param  exchange the current exchange
+     * @param  callback the callback for the exchange
+     * @return          the task
+     */
+    PooledExchangeTask create(Exchange exchange, AsyncCallback callback);
+
+    /**
+     * Attempts to acquire a pooled task to use for processing the exchange, if not possible then a new task is created.
+     *
+     * @param  exchange the current exchange
+     * @param  callback the callback for the exchange
+     * @return          the task
+     */
+    PooledExchangeTask acquire(Exchange exchange, AsyncCallback callback);
+
+    /**
+     * Releases the task after its done being used
+     *
+     * @param  task the task
+     * @return      true if the task was released, and false if the task failed to be released or no space in pool, and
+     *              the task was discarded.
+     */
+    boolean release(PooledExchangeTask task);
+}
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledTaskFactory.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledTaskFactory.java
new file mode 100644
index 0000000..c1ce72e
--- /dev/null
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledTaskFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.support.PooledObjectFactorySupport;
+
+public abstract class PooledTaskFactory extends PooledObjectFactorySupport<PooledExchangeTask>
+        implements PooledExchangeTaskFactory {
+
+    @Override
+    public PooledExchangeTask acquire() {
+        return pool.poll();
+    }
+
+    public PooledExchangeTask acquire(Exchange exchange, AsyncCallback callback) {
+        PooledExchangeTask task = acquire();
+        if (task == null) {
+            if (statistics.isStatisticsEnabled()) {
+                statistics.created.increment();
+            }
+            task = create(exchange, callback);
+        } else {
+            if (statistics.isStatisticsEnabled()) {
+                statistics.acquired.increment();
+            }
+        }
+        task.prepare(exchange, callback);
+        return task;
+    }
+
+    @Override
+    public boolean release(PooledExchangeTask task) {
+        boolean inserted = pool.offer(task);
+        if (statistics.isStatisticsEnabled()) {
+            if (inserted) {
+                statistics.released.increment();
+            } else {
+                statistics.discarded.increment();
+            }
+        }
+        return inserted;
+    }
+
+    @Override
+    public String toString() {
+        return "PooledTaskFactory[capacity: " + getCapacity() + "]";
+    }
+}
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PrototypeTaskFactory.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PrototypeTaskFactory.java
new file mode 100644
index 0000000..0c5c444
--- /dev/null
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PrototypeTaskFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.support.PrototypeObjectFactorySupport;
+
+public abstract class PrototypeTaskFactory extends PrototypeObjectFactorySupport<PooledExchangeTask>
+        implements PooledExchangeTaskFactory {
+
+    @Override
+    public PooledExchangeTask acquire(Exchange exchange, AsyncCallback callback) {
+        PooledExchangeTask task = create(exchange, callback);
+        task.prepare(exchange, callback);
+        return task;
+    }
+
+    @Override
+    public PooledExchangeTask acquire() {
+        throw new UnsupportedOperationException("Not in use");
+    }
+
+    @Override
+    public boolean release(PooledExchangeTask pooledTask) {
+        // not pooled
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return "PrototypeTaskFactory";
+    }
+}
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
index 3c0eaf3..39d3658 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
@@ -38,6 +38,10 @@ import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
 import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.processor.PooledExchangeTask;
+import org.apache.camel.processor.PooledExchangeTaskFactory;
+import org.apache.camel.processor.PooledTaskFactory;
+import org.apache.camel.processor.PrototypeTaskFactory;
 import org.apache.camel.spi.AsyncProcessorAwaitManager;
 import org.apache.camel.spi.CamelLogger;
 import org.apache.camel.spi.ErrorHandlerRedeliveryCustomizer;
@@ -69,6 +73,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
 
     private static final Logger LOG = LoggerFactory.getLogger(RedeliveryErrorHandler.class);
 
+    // factory
+    protected PooledExchangeTaskFactory taskFactory;
+
     // state
     protected final AtomicInteger redeliverySleepCounter = new AtomicInteger();
     protected ScheduledExecutorService executorService;
@@ -169,12 +176,8 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
     @Override
     public boolean process(final Exchange exchange, final AsyncCallback callback) {
         // Create the redelivery task object for this exchange (optimize to only create task can do redelivery or not)
-        Runnable task;
-        if (simpleTask) {
-            task = new SimpleTask(exchange, callback);
-        } else {
-            task = new RedeliveryTask(exchange, callback);
-        }
+        Runnable task = taskFactory.acquire(exchange, callback);
+
         // Run it
         if (exchange.isTransacted()) {
             reactiveExecutor.scheduleSync(task);
@@ -345,14 +348,18 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
     /**
      * Simple task to perform calling the processor with no redelivery support
      */
-    protected class SimpleTask implements Runnable, AsyncCallback {
-        private final ExtendedExchange exchange;
-        private final AsyncCallback callback;
-        private boolean first = true;
+    protected class SimpleTask implements PooledExchangeTask, Runnable, AsyncCallback {
+        private ExtendedExchange exchange;
+        private AsyncCallback callback;
+        private boolean first;
 
-        SimpleTask(Exchange exchange, AsyncCallback callback) {
+        public SimpleTask() {
+        }
+
+        public void prepare(Exchange exchange, AsyncCallback callback) {
             this.exchange = (ExtendedExchange) exchange;
             this.callback = callback;
+            this.first = true;
         }
 
         @Override
@@ -360,6 +367,12 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
             return "SimpleTask";
         }
 
+        public void reset() {
+            this.exchange = null;
+            this.callback = null;
+            this.first = true;
+        }
+
         @Override
         public void done(boolean doneSync) {
             // the run method decides what to do when we are done
@@ -385,7 +398,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
                 if (exchange.getException() == null) {
                     exchange.setException(new RejectedExecutionException());
                 }
-                callback.done(false);
+                AsyncCallback cb = callback;
+                taskFactory.release(this);
+                cb.done(false);
                 return;
             }
             if (exchange.isInterrupted()) {
@@ -396,7 +411,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
                 }
                 exchange.setRouteStop(true);
                 // we should not continue routing so call callback
-                callback.done(false);
+                AsyncCallback cb = callback;
+                taskFactory.release(this);
+                cb.done(false);
                 return;
             }
 
@@ -413,14 +430,18 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
                 onExceptionOccurred();
                 prepareExchangeAfterFailure(exchange);
                 // we do not support redelivery so continue callback
-                reactiveExecutor.schedule(callback);
+                AsyncCallback cb = callback;
+                taskFactory.release(this);
+                reactiveExecutor.schedule(cb);
             } else if (first) {
                 // first time call the target processor
                 first = false;
                 outputAsync.process(exchange, this);
             } else {
                 // we are done so continue callback
-                reactiveExecutor.schedule(callback);
+                AsyncCallback cb = callback;
+                taskFactory.release(this);
+                reactiveExecutor.schedule(cb);
             }
         }
 
@@ -585,15 +606,16 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
     /**
      * Task to perform calling the processor and handling redelivery if it fails (more advanced than ProcessTask)
      */
-    protected class RedeliveryTask implements Runnable {
-        private final Exchange original;
-        private final ExtendedExchange exchange;
-        private final AsyncCallback callback;
+    protected class RedeliveryTask implements PooledExchangeTask, Runnable {
+        // state
+        private Exchange original;
+        private ExtendedExchange exchange;
+        private AsyncCallback callback;
         private int redeliveryCounter;
         private long redeliveryDelay;
-        private Predicate retryWhilePredicate;
 
         // default behavior which can be overloaded on a per exception basis
+        private Predicate retryWhilePredicate;
         private RedeliveryPolicy currentRedeliveryPolicy;
         private Processor failureProcessor;
         private Processor onRedeliveryProcessor;
@@ -603,7 +625,16 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
         private boolean useOriginalInMessage;
         private boolean useOriginalInBody;
 
-        public RedeliveryTask(Exchange exchange, AsyncCallback callback) {
+        public RedeliveryTask() {
+        }
+
+        @Override
+        public String toString() {
+            return "RedeliveryTask";
+        }
+
+        @Override
+        public void prepare(Exchange exchange, AsyncCallback callback) {
             this.retryWhilePredicate = retryWhilePolicy;
             this.currentRedeliveryPolicy = redeliveryPolicy;
             this.handledPredicate = getDefaultHandledPredicate();
@@ -611,7 +642,6 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
             this.useOriginalInBody = useOriginalBodyPolicy;
             this.onRedeliveryProcessor = redeliveryProcessor;
             this.onExceptionProcessor = RedeliveryErrorHandler.this.onExceptionProcessor;
-
             // do a defensive copy of the original Exchange, which is needed for redelivery so we can ensure the
             // original Exchange is being redelivered, and not a mutated Exchange
             this.original = redeliveryEnabled ? defensiveCopyExchangeIfNeeded(exchange) : null;
@@ -620,8 +650,20 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
         }
 
         @Override
-        public String toString() {
-            return "RedeliveryTask";
+        public void reset() {
+            this.retryWhilePredicate = null;
+            this.currentRedeliveryPolicy = null;
+            this.handledPredicate = null;
+            this.continuedPredicate = null;
+            this.useOriginalInMessage = false;
+            this.useOriginalInBody = false;
+            this.onRedeliveryProcessor = null;
+            this.onExceptionProcessor = null;
+            this.original = null;
+            this.exchange = null;
+            this.callback = null;
+            this.redeliveryCounter = 0;
+            this.redeliveryDelay = 0;
         }
 
         /**
@@ -635,7 +677,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
                 if (exchange.getException() == null) {
                     exchange.setException(new RejectedExecutionException());
                 }
-                callback.done(false);
+                AsyncCallback cb = callback;
+                taskFactory.release(this);
+                cb.done(false);
                 return;
             }
 
@@ -644,7 +688,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
             } catch (Throwable e) {
                 // unexpected exception during running so break out
                 exchange.setException(e);
-                callback.done(false);
+                AsyncCallback cb = callback;
+                taskFactory.release(this);
+                cb.done(false);
             }
         }
 
@@ -804,7 +850,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
                 // only process if the exchange hasn't failed
                 // and it has not been handled by the error processor
                 if (isDone(exchange)) {
-                    reactiveExecutor.schedule(callback);
+                    AsyncCallback cb = callback;
+                    taskFactory.release(this);
+                    reactiveExecutor.schedule(cb);
                 } else {
                     // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
                     reactiveExecutor.schedule(this);
@@ -1107,7 +1155,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
                         }
                     } finally {
                         // if the fault was handled asynchronously, this should be reflected in the callback as well
-                        reactiveExecutor.schedule(callback);
+                        AsyncCallback cb = callback;
+                        taskFactory.release(this);
+                        reactiveExecutor.schedule(cb);
                     }
                 });
             } else {
@@ -1126,7 +1176,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
                     prepareExchangeAfterFailure(exchange, isDeadLetterChannel, shouldHandle, shouldContinue);
                 } finally {
                     // callback we are done
-                    reactiveExecutor.schedule(callback);
+                    AsyncCallback cb = callback;
+                    taskFactory.release(this);
+                    reactiveExecutor.schedule(cb);
                 }
             }
 
@@ -1503,8 +1555,6 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
 
     @Override
     protected void doStart() throws Exception {
-        ServiceHelper.startService(output, outputAsync, deadLetter);
-
         // determine if redeliver is enabled or not
         redeliveryEnabled = determineIfRedeliveryIsEnabled();
         if (LOG.isTraceEnabled()) {
@@ -1531,6 +1581,28 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
         // however if we dont then its less memory overhead (and a bit less cpu) of using the simple task
         simpleTask = deadLetter == null && !redeliveryEnabled && (exceptionPolicies == null || exceptionPolicies.isEmpty())
                 && onPrepareProcessor == null;
+
+        boolean pooled = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled();
+        if (pooled) {
+            taskFactory = new PooledTaskFactory() {
+                @Override
+                public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) {
+                    return simpleTask ? new SimpleTask() : new RedeliveryTask();
+                }
+            };
+            int capacity = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().getCapacity();
+            taskFactory.setCapacity(capacity);
+        } else {
+            taskFactory = new PrototypeTaskFactory() {
+                @Override
+                public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) {
+                    return simpleTask ? new SimpleTask() : new RedeliveryTask();
+                }
+            };
+        }
+        LOG.trace("Using TaskFactory: {}", taskFactory);
+
+        ServiceHelper.startService(taskFactory, output, outputAsync, deadLetter);
     }
 
     @Override
@@ -1542,6 +1614,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
 
     @Override
     protected void doShutdown() throws Exception {
-        ServiceHelper.stopAndShutdownServices(deadLetter, output, outputAsync);
+        ServiceHelper.stopAndShutdownServices(deadLetter, output, outputAsync, taskFactory);
     }
+
 }
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/PooledObjectFactorySupport.java b/core/camel-support/src/main/java/org/apache/camel/support/PooledObjectFactorySupport.java
new file mode 100644
index 0000000..91c56cd
--- /dev/null
+++ b/core/camel-support/src/main/java/org/apache/camel/support/PooledObjectFactorySupport.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.support;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.LongAdder;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.spi.PooledObjectFactory;
+import org.apache.camel.support.service.ServiceSupport;
+
+public abstract class PooledObjectFactorySupport<T> extends ServiceSupport implements PooledObjectFactory<T> {
+
+    protected final UtilizationStatistics statistics = new UtilizationStatistics();
+
+    protected CamelContext camelContext;
+    protected BlockingQueue<T> pool;
+    protected int capacity = 100;
+
+    @Override
+    protected void doBuild() throws Exception {
+        super.doBuild();
+        this.pool = new ArrayBlockingQueue<>(capacity);
+    }
+
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    @Override
+    public boolean isStatisticsEnabled() {
+        return statistics.isStatisticsEnabled();
+    }
+
+    @Override
+    public void setStatisticsEnabled(boolean statisticsEnabled) {
+        statistics.setStatisticsEnabled(statisticsEnabled);
+    }
+
+    @Override
+    public int getSize() {
+        if (pool != null) {
+            return pool.size();
+        } else {
+            return 0;
+        }
+    }
+
+    @Override
+    public int getCapacity() {
+        return capacity;
+    }
+
+    @Override
+    public void setCapacity(int capacity) {
+        this.capacity = capacity;
+    }
+
+    @Override
+    public void resetStatistics() {
+        statistics.reset();
+    }
+
+    @Override
+    public void purge() {
+        pool.clear();
+    }
+
+    @Override
+    public Statistics getStatistics() {
+        return statistics;
+    }
+
+    @Override
+    protected void doShutdown() throws Exception {
+        super.doShutdown();
+        statistics.reset();
+        pool.clear();
+    }
+
+    /**
+     * Represents utilization statistics
+     */
+    protected final class UtilizationStatistics implements PooledObjectFactory.Statistics {
+
+        boolean statisticsEnabled;
+        public final LongAdder created = new LongAdder();
+        public final LongAdder acquired = new LongAdder();
+        public final LongAdder released = new LongAdder();
+        public final LongAdder discarded = new LongAdder();
+
+        @Override
+        public void reset() {
+            created.reset();
+            acquired.reset();
+            released.reset();
+            discarded.reset();
+        }
+
+        @Override
+        public long getCreatedCounter() {
+            return created.longValue();
+        }
+
+        @Override
+        public long getAcquiredCounter() {
+            return acquired.longValue();
+        }
+
+        @Override
+        public long getReleasedCounter() {
+            return released.longValue();
+        }
+
+        @Override
+        public long getDiscardedCounter() {
+            return discarded.longValue();
+        }
+
+        @Override
+        public boolean isStatisticsEnabled() {
+            return statisticsEnabled;
+        }
+
+        @Override
+        public void setStatisticsEnabled(boolean statisticsEnabled) {
+            this.statisticsEnabled = statisticsEnabled;
+        }
+    }
+
+}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/PrototypeObjectFactorySupport.java b/core/camel-support/src/main/java/org/apache/camel/support/PrototypeObjectFactorySupport.java
new file mode 100644
index 0000000..5df6bbf
--- /dev/null
+++ b/core/camel-support/src/main/java/org/apache/camel/support/PrototypeObjectFactorySupport.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.support;
+
+import java.util.concurrent.atomic.LongAdder;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.spi.PooledObjectFactory;
+import org.apache.camel.support.service.ServiceSupport;
+
+/**
+ * {@link org.apache.camel.spi.PooledObjectFactory} that creates a new instance (does not pool).
+ */
+public abstract class PrototypeObjectFactorySupport<T> extends ServiceSupport implements PooledObjectFactory<T> {
+
+    protected final UtilizationStatistics statistics = new UtilizationStatistics();
+    private CamelContext camelContext;
+
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    @Override
+    public boolean isStatisticsEnabled() {
+        return statistics.isStatisticsEnabled();
+    }
+
+    @Override
+    public void setStatisticsEnabled(boolean statisticsEnabled) {
+        statistics.setStatisticsEnabled(statisticsEnabled);
+    }
+
+    @Override
+    public int getSize() {
+        return 0;
+    }
+
+    @Override
+    public int getCapacity() {
+        return 0;
+    }
+
+    @Override
+    public void setCapacity(int capacity) {
+        // not in use
+    }
+
+    @Override
+    public void resetStatistics() {
+        statistics.reset();
+    }
+
+    @Override
+    public void purge() {
+        // not in use
+    }
+
+    @Override
+    public Statistics getStatistics() {
+        return statistics;
+    }
+
+    @Override
+    protected void doShutdown() throws Exception {
+        super.doShutdown();
+        statistics.reset();
+    }
+
+    /**
+     * Represents utilization statistics
+     */
+    protected final class UtilizationStatistics implements Statistics {
+
+        boolean statisticsEnabled;
+        public final LongAdder created = new LongAdder();
+        public final LongAdder acquired = new LongAdder();
+        public final LongAdder released = new LongAdder();
+        public final LongAdder discarded = new LongAdder();
+
+        @Override
+        public void reset() {
+            created.reset();
+            acquired.reset();
+            released.reset();
+            discarded.reset();
+        }
+
+        @Override
+        public long getCreatedCounter() {
+            return created.longValue();
+        }
+
+        @Override
+        public long getAcquiredCounter() {
+            return acquired.longValue();
+        }
+
+        @Override
+        public long getReleasedCounter() {
+            return released.longValue();
+        }
+
+        @Override
+        public long getDiscardedCounter() {
+            return discarded.longValue();
+        }
+
+        @Override
+        public boolean isStatisticsEnabled() {
+            return statisticsEnabled;
+        }
+
+        @Override
+        public void setStatisticsEnabled(boolean statisticsEnabled) {
+            this.statisticsEnabled = statisticsEnabled;
+        }
+    }
+
+}