You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/03/07 16:55:56 UTC

[camel] 01/01: CAMEL-16279: camel-core - Optimize core to reduce object allocations by pooloing reusable tasks in the routing engine.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch pool
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 337b9c579a73cfdae52721f2f04f76ea616681a8
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Mar 7 17:54:51 2021 +0100

    CAMEL-16279: camel-core - Optimize core to reduce object allocations by pooloing reusable tasks in the routing engine.
---
 .../java/org/apache/camel/spi/ExchangeFactory.java |  86 +-----------
 .../apache/camel/spi/InternalProcessorFactory.java |   4 +-
 .../org/apache/camel/spi/PooledObjectFactory.java  | 125 +++++++++++++++++
 .../camel/impl/engine/AbstractCamelContext.java    |   3 +
 .../camel/impl/engine/PooledExchangeFactory.java   |   5 +
 .../impl/engine/PrototypeExchangeFactory.java      | 111 ++-------------
 .../apache/camel/processor/PooledExchangeTask.java |  41 ++++++
 .../camel/processor/PooledExchangeTaskFactory.java |  56 ++++++++
 .../apache/camel/processor/PooledTaskFactory.java  |  64 +++++++++
 .../camel/processor/PrototypeTaskFactory.java      |  48 +++++++
 .../errorhandler/RedeliveryErrorHandler.java       | 137 ++++++++++++++-----
 .../camel/support/PooledObjectFactorySupport.java  | 152 +++++++++++++++++++++
 .../support/PrototypeObjectFactorySupport.java     | 139 +++++++++++++++++++
 13 files changed, 756 insertions(+), 215 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
index fcedd54..29b3fdb 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
@@ -16,12 +16,10 @@
  */
 package org.apache.camel.spi;
 
-import org.apache.camel.CamelContextAware;
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.NonManagedService;
-import org.apache.camel.Service;
 
 /**
  * Factory used by {@link Consumer} to create Camel {@link Exchange} holding the incoming message received by the
@@ -36,50 +34,7 @@ import org.apache.camel.Service;
  * The factory is pluggable which allows to use different strategies. The default factory will create a new
  * {@link Exchange} instance, and the pooled factory will pool and reuse exchanges.
  */
-public interface ExchangeFactory extends Service, CamelContextAware, NonManagedService, RouteIdAware {
-
-    /**
-     * Utilization statistics of the this factory.
-     */
-    interface Statistics {
-
-        /**
-         * Number of new exchanges created.
-         */
-        long getCreatedCounter();
-
-        /**
-         * Number of exchanges acquired (reused) when using pooled factory.
-         */
-        long getAcquiredCounter();
-
-        /**
-         * Number of exchanges released back to pool
-         */
-        long getReleasedCounter();
-
-        /**
-         * Number of exchanges discarded (thrown away) such as if no space in cache pool.
-         */
-        long getDiscardedCounter();
-
-        /**
-         * Reset the counters
-         */
-        void reset();
-
-        /**
-         * Whether statistics is enabled.
-         */
-        boolean isStatisticsEnabled();
-
-        /**
-         * Sets whether statistics is enabled.
-         *
-         * @param statisticsEnabled <tt>true</tt> to enable
-         */
-        void setStatisticsEnabled(boolean statisticsEnabled);
-    }
+public interface ExchangeFactory extends PooledObjectFactory<Exchange>, NonManagedService, RouteIdAware {
 
     /**
      * Service factory key.
@@ -125,43 +80,8 @@ public interface ExchangeFactory extends Service, CamelContextAware, NonManagedS
     }
 
     /**
-     * The capacity the pool (for each consumer) uses for storing exchanges. The default capacity is 100.
-     */
-    int getCapacity();
-
-    /**
-     * The current number of exchanges in the pool
-     */
-    int getSize();
-
-    /**
-     * The capacity the pool (for each consumer) uses for storing exchanges. The default capacity is 100.
-     */
-    void setCapacity(int capacity);
-
-    /**
-     * Whether statistics is enabled.
-     */
-    boolean isStatisticsEnabled();
-
-    /**
-     * Whether statistics is enabled.
-     */
-    void setStatisticsEnabled(boolean statisticsEnabled);
-
-    /**
-     * Reset the statistics
-     */
-    void resetStatistics();
-
-    /**
-     * Purges the internal cache (if pooled)
-     */
-    void purge();
-
-    /**
-     * Gets the usage statistics
+     * Whether the factory is pooled.
      */
-    Statistics getStatistics();
+    boolean isPooled();
 
 }
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java
index b3f8868..8a56f21 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java
@@ -27,7 +27,9 @@ import org.apache.camel.Route;
 
 /**
  * A factory used internally by Camel to create {@link Processor} and other internal building blocks. This factory is
- * used to have loose coupling between the modules in core. Camel user user should only use {@link ProcessorFactory}.
+ * used to have loose coupling between the modules in core.
+ *
+ * Camel end user should NOT use this, but use {@link ProcessorFactory} instead.
  *
  * @see ProcessorFactory
  */
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/PooledObjectFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/PooledObjectFactory.java
new file mode 100644
index 0000000..db4c0d1
--- /dev/null
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/PooledObjectFactory.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Service;
+
+/**
+ * Factory for pooled objects or tasks.
+ */
+public interface PooledObjectFactory<T> extends Service, CamelContextAware {
+
+    /**
+     * Utilization statistics of the this factory.
+     */
+    interface Statistics {
+
+        /**
+         * Number of new exchanges created.
+         */
+        long getCreatedCounter();
+
+        /**
+         * Number of exchanges acquired (reused) when using pooled factory.
+         */
+        long getAcquiredCounter();
+
+        /**
+         * Number of exchanges released back to pool
+         */
+        long getReleasedCounter();
+
+        /**
+         * Number of exchanges discarded (thrown away) such as if no space in cache pool.
+         */
+        long getDiscardedCounter();
+
+        /**
+         * Reset the counters
+         */
+        void reset();
+
+        /**
+         * Whether statistics is enabled.
+         */
+        boolean isStatisticsEnabled();
+
+        /**
+         * Sets whether statistics is enabled.
+         *
+         * @param statisticsEnabled <tt>true</tt> to enable
+         */
+        void setStatisticsEnabled(boolean statisticsEnabled);
+    }
+
+    /**
+     * The current number of objects in the pool
+     */
+    int getSize();
+
+    /**
+     * The capacity the pool uses for storing objects. The default capacity is 100.
+     */
+    int getCapacity();
+
+    /**
+     * The capacity the pool uses for storing objects. The default capacity is 100.
+     */
+    void setCapacity(int capacity);
+
+    /**
+     * Whether statistics is enabled.
+     */
+    boolean isStatisticsEnabled();
+
+    /**
+     * Whether statistics is enabled.
+     */
+    void setStatisticsEnabled(boolean statisticsEnabled);
+
+    /**
+     * Reset the statistics
+     */
+    void resetStatistics();
+
+    /**
+     * Purges the internal cache (if pooled)
+     */
+    void purge();
+
+    /**
+     * Gets the usage statistics
+     */
+    Statistics getStatistics();
+
+    /**
+     * Acquires an object from the pool (if any)
+     *
+     * @return the object or <tt>null</tt> if the pool is empty
+     */
+    T acquire();
+
+    /**
+     * Releases the object back to the pool
+     *
+     * @param  t the object
+     * @return   true if released into the pool, or false if something went wrong and the object was discarded
+     */
+    boolean release(T t);
+
+}
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index d7e6dd4..45fcf6d 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -2882,6 +2882,9 @@ public abstract class AbstractCamelContext extends BaseService
         }
         bootstraps.clear();
 
+        if (adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled()) {
+            LOG.info("Pooled mode enabled. Camel pools and reuses objects to reduce JVM object allocations.");
+        }
         if (isLightweight()) {
             LOG.info("Lightweight mode enabled. Performing optimizations and memory reduction.");
             ReifierStrategy.clearReifiers();
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
index 4079a46..b1647b5 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
@@ -168,6 +168,11 @@ public final class PooledExchangeFactory extends PrototypeExchangeFactory {
     }
 
     @Override
+    public boolean isPooled() {
+        return true;
+    }
+
+    @Override
     protected void doStop() throws Exception {
         exchangeFactoryManager.removeExchangeFactory(this);
         logUsageSummary(LOG, "PooledExchangeFactory", pool.size());
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeExchangeFactory.java
index fb17cb0..eef1ea7 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeExchangeFactory.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeExchangeFactory.java
@@ -16,9 +16,6 @@
  */
 package org.apache.camel.impl.engine;
 
-import java.util.concurrent.atomic.LongAdder;
-
-import org.apache.camel.CamelContext;
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
@@ -26,7 +23,7 @@ import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.spi.ExchangeFactory;
 import org.apache.camel.spi.ExchangeFactoryManager;
 import org.apache.camel.support.DefaultExchange;
-import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.support.PooledObjectFactorySupport;
 import org.apache.camel.util.URISupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,13 +31,11 @@ import org.slf4j.LoggerFactory;
 /**
  * {@link ExchangeFactory} that creates a new {@link Exchange} instance.
  */
-public class PrototypeExchangeFactory extends ServiceSupport implements ExchangeFactory {
+public class PrototypeExchangeFactory extends PooledObjectFactorySupport<Exchange> implements ExchangeFactory {
 
     private static final Logger LOG = LoggerFactory.getLogger(PrototypeExchangeFactory.class);
 
-    final UtilizationStatistics statistics = new UtilizationStatistics();
     final Consumer consumer;
-    CamelContext camelContext;
     ExchangeFactoryManager exchangeFactoryManager;
     String routeId;
 
@@ -54,6 +49,7 @@ public class PrototypeExchangeFactory extends ServiceSupport implements Exchange
 
     @Override
     protected void doBuild() throws Exception {
+        super.doBuild();
         this.exchangeFactoryManager = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactoryManager();
     }
 
@@ -73,16 +69,6 @@ public class PrototypeExchangeFactory extends ServiceSupport implements Exchange
     }
 
     @Override
-    public CamelContext getCamelContext() {
-        return camelContext;
-    }
-
-    @Override
-    public void setCamelContext(CamelContext camelContext) {
-        this.camelContext = camelContext;
-    }
-
-    @Override
     public ExchangeFactory newExchangeFactory(Consumer consumer) {
         PrototypeExchangeFactory answer = new PrototypeExchangeFactory(consumer);
         answer.setStatisticsEnabled(statistics.isStatisticsEnabled());
@@ -92,6 +78,11 @@ public class PrototypeExchangeFactory extends ServiceSupport implements Exchange
     }
 
     @Override
+    public Exchange acquire() {
+        throw new UnsupportedOperationException("Not in use");
+    }
+
+    @Override
     public Exchange create(boolean autoRelease) {
         if (statistics.isStatisticsEnabled()) {
             statistics.created.increment();
@@ -116,47 +107,18 @@ public class PrototypeExchangeFactory extends ServiceSupport implements Exchange
     }
 
     @Override
-    public boolean isStatisticsEnabled() {
-        return statistics.isStatisticsEnabled();
-    }
-
-    @Override
-    public void setStatisticsEnabled(boolean statisticsEnabled) {
-        statistics.setStatisticsEnabled(statisticsEnabled);
-    }
-
-    @Override
-    public int getCapacity() {
-        return 0;
-    }
-
-    @Override
-    public int getSize() {
-        return 0;
-    }
-
-    @Override
-    public void setCapacity(int capacity) {
-        // not in use
-    }
-
-    @Override
     public void resetStatistics() {
         statistics.reset();
     }
 
     @Override
-    public void purge() {
-        // not in use
-    }
-
-    @Override
-    public Statistics getStatistics() {
-        return statistics;
+    public boolean isPooled() {
+        return false;
     }
 
     @Override
     protected void doStart() throws Exception {
+        super.doStart();
         if (exchangeFactoryManager != null) {
             exchangeFactoryManager.addExchangeFactory(this);
         }
@@ -164,6 +126,7 @@ public class PrototypeExchangeFactory extends ServiceSupport implements Exchange
 
     @Override
     protected void doStop() throws Exception {
+        super.doStop();
         if (exchangeFactoryManager != null) {
             exchangeFactoryManager.removeExchangeFactory(this);
         }
@@ -190,54 +153,4 @@ public class PrototypeExchangeFactory extends ServiceSupport implements Exchange
         }
     }
 
-    /**
-     * Represents utilization statistics
-     */
-    final class UtilizationStatistics implements ExchangeFactory.Statistics {
-
-        boolean statisticsEnabled;
-        final LongAdder created = new LongAdder();
-        final LongAdder acquired = new LongAdder();
-        final LongAdder released = new LongAdder();
-        final LongAdder discarded = new LongAdder();
-
-        @Override
-        public void reset() {
-            created.reset();
-            acquired.reset();
-            released.reset();
-            discarded.reset();
-        }
-
-        @Override
-        public long getCreatedCounter() {
-            return created.longValue();
-        }
-
-        @Override
-        public long getAcquiredCounter() {
-            return acquired.longValue();
-        }
-
-        @Override
-        public long getReleasedCounter() {
-            return released.longValue();
-        }
-
-        @Override
-        public long getDiscardedCounter() {
-            return discarded.longValue();
-        }
-
-        @Override
-        public boolean isStatisticsEnabled() {
-            return statisticsEnabled;
-        }
-
-        @Override
-        public void setStatisticsEnabled(boolean statisticsEnabled) {
-            this.statisticsEnabled = statisticsEnabled;
-        }
-    }
-
 }
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledExchangeTask.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledExchangeTask.java
new file mode 100644
index 0000000..d4e0226
--- /dev/null
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledExchangeTask.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+
+/**
+ * A task that EIPs and internal routing engine uses to store state when processing an {@link Exchange}.
+ *
+ * @see org.apache.camel.processor.PooledExchangeTaskFactory
+ */
+public interface PooledExchangeTask extends Runnable {
+
+    /**
+     * Prepares the task for the given exchange and its callback
+     *
+     * @param exchange the exchange
+     * @param callback the callback
+     */
+    void prepare(Exchange exchange, AsyncCallback callback);
+
+    /**
+     * Resets the task after its done and can be reused for another exchange.
+     */
+    void reset();
+}
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledExchangeTaskFactory.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledExchangeTaskFactory.java
new file mode 100644
index 0000000..b90e9f5
--- /dev/null
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledExchangeTaskFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.spi.PooledObjectFactory;
+
+/**
+ * Factory to create {@link PooledExchangeTask}.
+ *
+ * @see PooledExchangeTask
+ */
+public interface PooledExchangeTaskFactory extends PooledObjectFactory<PooledExchangeTask> {
+
+    /**
+     * Creates a new task to use for processing the exchange.
+     *
+     * @param  exchange the current exchange
+     * @param  callback the callback for the exchange
+     * @return          the task
+     */
+    PooledExchangeTask create(Exchange exchange, AsyncCallback callback);
+
+    /**
+     * Attempts to acquire a pooled task to use for processing the exchange, if not possible then a new task is created.
+     *
+     * @param  exchange the current exchange
+     * @param  callback the callback for the exchange
+     * @return          the task
+     */
+    PooledExchangeTask acquire(Exchange exchange, AsyncCallback callback);
+
+    /**
+     * Releases the task after its done being used
+     *
+     * @param  task the task
+     * @return      true if the task was released, and false if the task failed to be released or no space in pool, and
+     *              the task was discarded.
+     */
+    boolean release(PooledExchangeTask task);
+}
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledTaskFactory.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledTaskFactory.java
new file mode 100644
index 0000000..c1ce72e
--- /dev/null
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledTaskFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.support.PooledObjectFactorySupport;
+
+public abstract class PooledTaskFactory extends PooledObjectFactorySupport<PooledExchangeTask>
+        implements PooledExchangeTaskFactory {
+
+    @Override
+    public PooledExchangeTask acquire() {
+        return pool.poll();
+    }
+
+    public PooledExchangeTask acquire(Exchange exchange, AsyncCallback callback) {
+        PooledExchangeTask task = acquire();
+        if (task == null) {
+            if (statistics.isStatisticsEnabled()) {
+                statistics.created.increment();
+            }
+            task = create(exchange, callback);
+        } else {
+            if (statistics.isStatisticsEnabled()) {
+                statistics.acquired.increment();
+            }
+        }
+        task.prepare(exchange, callback);
+        return task;
+    }
+
+    @Override
+    public boolean release(PooledExchangeTask task) {
+        boolean inserted = pool.offer(task);
+        if (statistics.isStatisticsEnabled()) {
+            if (inserted) {
+                statistics.released.increment();
+            } else {
+                statistics.discarded.increment();
+            }
+        }
+        return inserted;
+    }
+
+    @Override
+    public String toString() {
+        return "PooledTaskFactory[capacity: " + getCapacity() + "]";
+    }
+}
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PrototypeTaskFactory.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PrototypeTaskFactory.java
new file mode 100644
index 0000000..0c5c444
--- /dev/null
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PrototypeTaskFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.support.PrototypeObjectFactorySupport;
+
+public abstract class PrototypeTaskFactory extends PrototypeObjectFactorySupport<PooledExchangeTask>
+        implements PooledExchangeTaskFactory {
+
+    @Override
+    public PooledExchangeTask acquire(Exchange exchange, AsyncCallback callback) {
+        PooledExchangeTask task = create(exchange, callback);
+        task.prepare(exchange, callback);
+        return task;
+    }
+
+    @Override
+    public PooledExchangeTask acquire() {
+        throw new UnsupportedOperationException("Not in use");
+    }
+
+    @Override
+    public boolean release(PooledExchangeTask pooledTask) {
+        // not pooled
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return "PrototypeTaskFactory";
+    }
+}
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
index 3c0eaf3..39d3658 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
@@ -38,6 +38,10 @@ import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
 import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.processor.PooledExchangeTask;
+import org.apache.camel.processor.PooledExchangeTaskFactory;
+import org.apache.camel.processor.PooledTaskFactory;
+import org.apache.camel.processor.PrototypeTaskFactory;
 import org.apache.camel.spi.AsyncProcessorAwaitManager;
 import org.apache.camel.spi.CamelLogger;
 import org.apache.camel.spi.ErrorHandlerRedeliveryCustomizer;
@@ -69,6 +73,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
 
     private static final Logger LOG = LoggerFactory.getLogger(RedeliveryErrorHandler.class);
 
+    // factory
+    protected PooledExchangeTaskFactory taskFactory;
+
     // state
     protected final AtomicInteger redeliverySleepCounter = new AtomicInteger();
     protected ScheduledExecutorService executorService;
@@ -169,12 +176,8 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
     @Override
     public boolean process(final Exchange exchange, final AsyncCallback callback) {
         // Create the redelivery task object for this exchange (optimize to only create task can do redelivery or not)
-        Runnable task;
-        if (simpleTask) {
-            task = new SimpleTask(exchange, callback);
-        } else {
-            task = new RedeliveryTask(exchange, callback);
-        }
+        Runnable task = taskFactory.acquire(exchange, callback);
+
         // Run it
         if (exchange.isTransacted()) {
             reactiveExecutor.scheduleSync(task);
@@ -345,14 +348,18 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
     /**
      * Simple task to perform calling the processor with no redelivery support
      */
-    protected class SimpleTask implements Runnable, AsyncCallback {
-        private final ExtendedExchange exchange;
-        private final AsyncCallback callback;
-        private boolean first = true;
+    protected class SimpleTask implements PooledExchangeTask, Runnable, AsyncCallback {
+        private ExtendedExchange exchange;
+        private AsyncCallback callback;
+        private boolean first;
 
-        SimpleTask(Exchange exchange, AsyncCallback callback) {
+        public SimpleTask() {
+        }
+
+        public void prepare(Exchange exchange, AsyncCallback callback) {
             this.exchange = (ExtendedExchange) exchange;
             this.callback = callback;
+            this.first = true;
         }
 
         @Override
@@ -360,6 +367,12 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
             return "SimpleTask";
         }
 
+        public void reset() {
+            this.exchange = null;
+            this.callback = null;
+            this.first = true;
+        }
+
         @Override
         public void done(boolean doneSync) {
             // the run method decides what to do when we are done
@@ -385,7 +398,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
                 if (exchange.getException() == null) {
                     exchange.setException(new RejectedExecutionException());
                 }
-                callback.done(false);
+                AsyncCallback cb = callback;
+                taskFactory.release(this);
+                cb.done(false);
                 return;
             }
             if (exchange.isInterrupted()) {
@@ -396,7 +411,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
                 }
                 exchange.setRouteStop(true);
                 // we should not continue routing so call callback
-                callback.done(false);
+                AsyncCallback cb = callback;
+                taskFactory.release(this);
+                cb.done(false);
                 return;
             }
 
@@ -413,14 +430,18 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
                 onExceptionOccurred();
                 prepareExchangeAfterFailure(exchange);
                 // we do not support redelivery so continue callback
-                reactiveExecutor.schedule(callback);
+                AsyncCallback cb = callback;
+                taskFactory.release(this);
+                reactiveExecutor.schedule(cb);
             } else if (first) {
                 // first time call the target processor
                 first = false;
                 outputAsync.process(exchange, this);
             } else {
                 // we are done so continue callback
-                reactiveExecutor.schedule(callback);
+                AsyncCallback cb = callback;
+                taskFactory.release(this);
+                reactiveExecutor.schedule(cb);
             }
         }
 
@@ -585,15 +606,16 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
     /**
      * Task to perform calling the processor and handling redelivery if it fails (more advanced than ProcessTask)
      */
-    protected class RedeliveryTask implements Runnable {
-        private final Exchange original;
-        private final ExtendedExchange exchange;
-        private final AsyncCallback callback;
+    protected class RedeliveryTask implements PooledExchangeTask, Runnable {
+        // state
+        private Exchange original;
+        private ExtendedExchange exchange;
+        private AsyncCallback callback;
         private int redeliveryCounter;
         private long redeliveryDelay;
-        private Predicate retryWhilePredicate;
 
         // default behavior which can be overloaded on a per exception basis
+        private Predicate retryWhilePredicate;
         private RedeliveryPolicy currentRedeliveryPolicy;
         private Processor failureProcessor;
         private Processor onRedeliveryProcessor;
@@ -603,7 +625,16 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
         private boolean useOriginalInMessage;
         private boolean useOriginalInBody;
 
-        public RedeliveryTask(Exchange exchange, AsyncCallback callback) {
+        public RedeliveryTask() {
+        }
+
+        @Override
+        public String toString() {
+            return "RedeliveryTask";
+        }
+
+        @Override
+        public void prepare(Exchange exchange, AsyncCallback callback) {
             this.retryWhilePredicate = retryWhilePolicy;
             this.currentRedeliveryPolicy = redeliveryPolicy;
             this.handledPredicate = getDefaultHandledPredicate();
@@ -611,7 +642,6 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
             this.useOriginalInBody = useOriginalBodyPolicy;
             this.onRedeliveryProcessor = redeliveryProcessor;
             this.onExceptionProcessor = RedeliveryErrorHandler.this.onExceptionProcessor;
-
             // do a defensive copy of the original Exchange, which is needed for redelivery so we can ensure the
             // original Exchange is being redelivered, and not a mutated Exchange
             this.original = redeliveryEnabled ? defensiveCopyExchangeIfNeeded(exchange) : null;
@@ -620,8 +650,20 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
         }
 
         @Override
-        public String toString() {
-            return "RedeliveryTask";
+        public void reset() {
+            this.retryWhilePredicate = null;
+            this.currentRedeliveryPolicy = null;
+            this.handledPredicate = null;
+            this.continuedPredicate = null;
+            this.useOriginalInMessage = false;
+            this.useOriginalInBody = false;
+            this.onRedeliveryProcessor = null;
+            this.onExceptionProcessor = null;
+            this.original = null;
+            this.exchange = null;
+            this.callback = null;
+            this.redeliveryCounter = 0;
+            this.redeliveryDelay = 0;
         }
 
         /**
@@ -635,7 +677,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
                 if (exchange.getException() == null) {
                     exchange.setException(new RejectedExecutionException());
                 }
-                callback.done(false);
+                AsyncCallback cb = callback;
+                taskFactory.release(this);
+                cb.done(false);
                 return;
             }
 
@@ -644,7 +688,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
             } catch (Throwable e) {
                 // unexpected exception during running so break out
                 exchange.setException(e);
-                callback.done(false);
+                AsyncCallback cb = callback;
+                taskFactory.release(this);
+                cb.done(false);
             }
         }
 
@@ -804,7 +850,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
                 // only process if the exchange hasn't failed
                 // and it has not been handled by the error processor
                 if (isDone(exchange)) {
-                    reactiveExecutor.schedule(callback);
+                    AsyncCallback cb = callback;
+                    taskFactory.release(this);
+                    reactiveExecutor.schedule(cb);
                 } else {
                     // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
                     reactiveExecutor.schedule(this);
@@ -1107,7 +1155,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
                         }
                     } finally {
                         // if the fault was handled asynchronously, this should be reflected in the callback as well
-                        reactiveExecutor.schedule(callback);
+                        AsyncCallback cb = callback;
+                        taskFactory.release(this);
+                        reactiveExecutor.schedule(cb);
                     }
                 });
             } else {
@@ -1126,7 +1176,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
                     prepareExchangeAfterFailure(exchange, isDeadLetterChannel, shouldHandle, shouldContinue);
                 } finally {
                     // callback we are done
-                    reactiveExecutor.schedule(callback);
+                    AsyncCallback cb = callback;
+                    taskFactory.release(this);
+                    reactiveExecutor.schedule(cb);
                 }
             }
 
@@ -1503,8 +1555,6 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
 
     @Override
     protected void doStart() throws Exception {
-        ServiceHelper.startService(output, outputAsync, deadLetter);
-
         // determine if redeliver is enabled or not
         redeliveryEnabled = determineIfRedeliveryIsEnabled();
         if (LOG.isTraceEnabled()) {
@@ -1531,6 +1581,28 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
         // however if we dont then its less memory overhead (and a bit less cpu) of using the simple task
         simpleTask = deadLetter == null && !redeliveryEnabled && (exceptionPolicies == null || exceptionPolicies.isEmpty())
                 && onPrepareProcessor == null;
+
+        boolean pooled = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled();
+        if (pooled) {
+            taskFactory = new PooledTaskFactory() {
+                @Override
+                public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) {
+                    return simpleTask ? new SimpleTask() : new RedeliveryTask();
+                }
+            };
+            int capacity = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().getCapacity();
+            taskFactory.setCapacity(capacity);
+        } else {
+            taskFactory = new PrototypeTaskFactory() {
+                @Override
+                public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) {
+                    return simpleTask ? new SimpleTask() : new RedeliveryTask();
+                }
+            };
+        }
+        LOG.trace("Using TaskFactory: {}", taskFactory);
+
+        ServiceHelper.startService(taskFactory, output, outputAsync, deadLetter);
     }
 
     @Override
@@ -1542,6 +1614,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
 
     @Override
     protected void doShutdown() throws Exception {
-        ServiceHelper.stopAndShutdownServices(deadLetter, output, outputAsync);
+        ServiceHelper.stopAndShutdownServices(deadLetter, output, outputAsync, taskFactory);
     }
+
 }
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/PooledObjectFactorySupport.java b/core/camel-support/src/main/java/org/apache/camel/support/PooledObjectFactorySupport.java
new file mode 100644
index 0000000..91c56cd
--- /dev/null
+++ b/core/camel-support/src/main/java/org/apache/camel/support/PooledObjectFactorySupport.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.support;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.LongAdder;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.spi.PooledObjectFactory;
+import org.apache.camel.support.service.ServiceSupport;
+
+public abstract class PooledObjectFactorySupport<T> extends ServiceSupport implements PooledObjectFactory<T> {
+
+    protected final UtilizationStatistics statistics = new UtilizationStatistics();
+
+    protected CamelContext camelContext;
+    protected BlockingQueue<T> pool;
+    protected int capacity = 100;
+
+    @Override
+    protected void doBuild() throws Exception {
+        super.doBuild();
+        this.pool = new ArrayBlockingQueue<>(capacity);
+    }
+
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    @Override
+    public boolean isStatisticsEnabled() {
+        return statistics.isStatisticsEnabled();
+    }
+
+    @Override
+    public void setStatisticsEnabled(boolean statisticsEnabled) {
+        statistics.setStatisticsEnabled(statisticsEnabled);
+    }
+
+    @Override
+    public int getSize() {
+        if (pool != null) {
+            return pool.size();
+        } else {
+            return 0;
+        }
+    }
+
+    @Override
+    public int getCapacity() {
+        return capacity;
+    }
+
+    @Override
+    public void setCapacity(int capacity) {
+        this.capacity = capacity;
+    }
+
+    @Override
+    public void resetStatistics() {
+        statistics.reset();
+    }
+
+    @Override
+    public void purge() {
+        pool.clear();
+    }
+
+    @Override
+    public Statistics getStatistics() {
+        return statistics;
+    }
+
+    @Override
+    protected void doShutdown() throws Exception {
+        super.doShutdown();
+        statistics.reset();
+        pool.clear();
+    }
+
+    /**
+     * Represents utilization statistics
+     */
+    protected final class UtilizationStatistics implements PooledObjectFactory.Statistics {
+
+        boolean statisticsEnabled;
+        public final LongAdder created = new LongAdder();
+        public final LongAdder acquired = new LongAdder();
+        public final LongAdder released = new LongAdder();
+        public final LongAdder discarded = new LongAdder();
+
+        @Override
+        public void reset() {
+            created.reset();
+            acquired.reset();
+            released.reset();
+            discarded.reset();
+        }
+
+        @Override
+        public long getCreatedCounter() {
+            return created.longValue();
+        }
+
+        @Override
+        public long getAcquiredCounter() {
+            return acquired.longValue();
+        }
+
+        @Override
+        public long getReleasedCounter() {
+            return released.longValue();
+        }
+
+        @Override
+        public long getDiscardedCounter() {
+            return discarded.longValue();
+        }
+
+        @Override
+        public boolean isStatisticsEnabled() {
+            return statisticsEnabled;
+        }
+
+        @Override
+        public void setStatisticsEnabled(boolean statisticsEnabled) {
+            this.statisticsEnabled = statisticsEnabled;
+        }
+    }
+
+}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/PrototypeObjectFactorySupport.java b/core/camel-support/src/main/java/org/apache/camel/support/PrototypeObjectFactorySupport.java
new file mode 100644
index 0000000..5df6bbf
--- /dev/null
+++ b/core/camel-support/src/main/java/org/apache/camel/support/PrototypeObjectFactorySupport.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.support;
+
+import java.util.concurrent.atomic.LongAdder;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.spi.PooledObjectFactory;
+import org.apache.camel.support.service.ServiceSupport;
+
+/**
+ * {@link org.apache.camel.spi.PooledObjectFactory} that creates a new instance (does not pool).
+ */
+public abstract class PrototypeObjectFactorySupport<T> extends ServiceSupport implements PooledObjectFactory<T> {
+
+    protected final UtilizationStatistics statistics = new UtilizationStatistics();
+    private CamelContext camelContext;
+
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    @Override
+    public boolean isStatisticsEnabled() {
+        return statistics.isStatisticsEnabled();
+    }
+
+    @Override
+    public void setStatisticsEnabled(boolean statisticsEnabled) {
+        statistics.setStatisticsEnabled(statisticsEnabled);
+    }
+
+    @Override
+    public int getSize() {
+        return 0;
+    }
+
+    @Override
+    public int getCapacity() {
+        return 0;
+    }
+
+    @Override
+    public void setCapacity(int capacity) {
+        // not in use
+    }
+
+    @Override
+    public void resetStatistics() {
+        statistics.reset();
+    }
+
+    @Override
+    public void purge() {
+        // not in use
+    }
+
+    @Override
+    public Statistics getStatistics() {
+        return statistics;
+    }
+
+    @Override
+    protected void doShutdown() throws Exception {
+        super.doShutdown();
+        statistics.reset();
+    }
+
+    /**
+     * Represents utilization statistics
+     */
+    protected final class UtilizationStatistics implements Statistics {
+
+        boolean statisticsEnabled;
+        public final LongAdder created = new LongAdder();
+        public final LongAdder acquired = new LongAdder();
+        public final LongAdder released = new LongAdder();
+        public final LongAdder discarded = new LongAdder();
+
+        @Override
+        public void reset() {
+            created.reset();
+            acquired.reset();
+            released.reset();
+            discarded.reset();
+        }
+
+        @Override
+        public long getCreatedCounter() {
+            return created.longValue();
+        }
+
+        @Override
+        public long getAcquiredCounter() {
+            return acquired.longValue();
+        }
+
+        @Override
+        public long getReleasedCounter() {
+            return released.longValue();
+        }
+
+        @Override
+        public long getDiscardedCounter() {
+            return discarded.longValue();
+        }
+
+        @Override
+        public boolean isStatisticsEnabled() {
+            return statisticsEnabled;
+        }
+
+        @Override
+        public void setStatisticsEnabled(boolean statisticsEnabled) {
+            this.statisticsEnabled = statisticsEnabled;
+        }
+    }
+
+}