You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by gn...@apache.org on 2018/11/22 16:33:20 UTC

[camel] 03/11: Full asynchronous engine with low stack depth

This is an automated email from the ASF dual-hosted git repository.

gnodet pushed a commit to branch sandbox/camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit d17a41fb6182e2991c31cb256bc093bace6b5e26
Author: Guillaume Nodet <gn...@gmail.com>
AuthorDate: Sat Oct 20 00:48:12 2018 +0200

    Full asynchronous engine with low stack depth
---
 .../camel/spi/AsyncProcessorAwaitManager.java      |   19 +-
 camel-core/pom.xml                                 |    5 +
 camel-core/src/main/docs/eips/delay-eip.adoc       |    2 +-
 .../apache/camel/component/file/FileConsumer.java  |   23 +-
 .../apache/camel/component/seda/SedaProducer.java  |    1 -
 .../camel/component/timer/TimerConsumer.java       |    5 +
 .../impl/DefaultAsyncProcessorAwaitManager.java    |   32 +-
 .../apache/camel/impl/DefaultProducerCache.java    |   12 +-
 .../org/apache/camel/model/DelayDefinition.java    |   12 +-
 .../camel/processor/CamelInternalProcessor.java    |  103 +-
 .../camel/processor/ConvertBodyProcessor.java      |   28 +-
 .../camel/processor/DelayProcessorSupport.java     |   23 +-
 .../camel/processor/DelegateAsyncProcessor.java    |   17 +-
 .../camel/processor/DelegateSyncProcessor.java     |    4 +-
 .../org/apache/camel/processor/LoopProcessor.java  |  176 +-
 .../apache/camel/processor/MulticastProcessor.java |  784 +++------
 .../java/org/apache/camel/processor/Pipeline.java  |  165 +-
 .../camel/processor/RedeliveryErrorHandler.java    | 1757 +++++++++-----------
 .../org/apache/camel/processor/Resequencer.java    |   21 +-
 .../org/apache/camel/processor/RoutingSlip.java    |    4 +-
 .../org/apache/camel/processor/SendProcessor.java  |   10 +-
 .../processor/SharedCamelInternalProcessor.java    |   50 +-
 .../java/org/apache/camel/processor/Splitter.java  |   14 +-
 .../apache/camel/processor/ThroughputLogger.java   |   11 +-
 .../org/apache/camel/processor/TryProcessor.java   |   90 +-
 .../processor/interceptor/DefaultChannel.java      |   13 -
 .../org/apache/camel/reifier/DelayReifier.java     |    2 +-
 .../apache/camel/support/AsyncProcessorHelper.java |   18 +-
 .../org/apache/camel/support/ReactiveHelper.java   |  156 ++
 .../component/seda/SedaBlockWhenFullTest.java      |    4 +-
 .../seda/SedaDefaultBlockWhenFullTest.java         |    4 +-
 .../camel/impl/MultipleConsumersSupportTest.java   |    8 +
 ...ngExchangesAsyncDelayShutdownGracefulTest.java} |   31 +-
 .../impl/PendingExchangesShutdownGracefulTest.java |    2 +-
 ...ndingExchangesTwoRouteShutdownGracefulTest.java |    4 +-
 .../EventNotifierFailureHandledEventsTest.java     |    8 +-
 .../CharlesSplitAndTryCatchRollbackIssueTest.java  |    4 +-
 ...thAggregationStrategyThrowingExceptionTest.java |   11 +-
 ...terParallelRuntimeExceptionInHasNextOrNext.java |   12 -
 ...ThreadsRejectedExecutionWithDeadLetterTest.java |    4 +-
 .../apache/camel/processor/LoopAsyncCopyTest.java  |    3 -
 .../camel/processor/LoopAsyncNoCopyTest.java       |    3 -
 .../MulticastParallelStopOnExceptionTest.java      |    3 +-
 .../processor/MulticastParallelStreamingTest.java  |    2 +-
 .../processor/MulticastStopOnExceptionTest.java    |    2 +-
 .../apache/camel/processor/NavigateRouteTest.java  |   22 +-
 ...pientListContextScopedOnExceptionIssueTest.java |    7 +
 .../processor/ShutdownCompleteAllTasksTest.java    |    2 +-
 .../ShutdownCompleteCurrentTaskOnlyTest.java       |    4 +-
 .../SplitterParallelStopOnExceptionTest.java       |    2 +-
 .../processor/SplitterStopOnExceptionTest.java     |    2 +-
 .../processor/ThreadsRejectedExecutionTest.java    |    2 +-
 .../camel/processor/UnmarshalProcessorTest.java    |    2 +
 .../async/AsyncEndpointCustomInterceptorTest.java  |    9 +-
 .../AsyncEndpointRecipientListParallel5Test.java   |    3 +-
 .../AsyncEndpointRecipientListParallelTest.java    |    3 +-
 .../routingslip/RoutingSlipEventNotifierTest.java  |    1 +
 .../java/org/apache/camel/util/FilterIterator.java |   88 +
 .../main/java/org/apache/camel/util/IOHelper.java  |   10 +-
 .../util/concurrent/AsyncCompletionService.java    |  167 ++
 .../concurrent/AsyncCompletionServiceTest.java     |  196 +++
 .../processor/ShutdownCompleteAllTasksTest.xml     |    2 +-
 .../ShutdownCompleteCurrentTaskOnlyTest.xml        |    2 +-
 63 files changed, 2050 insertions(+), 2136 deletions(-)

diff --git a/camel-api/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java b/camel-api/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java
index d4f8bdd..dfcfdce 100644
--- a/camel-api/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java
+++ b/camel-api/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java
@@ -19,6 +19,7 @@ package org.apache.camel.spi;
 import java.util.Collection;
 import java.util.concurrent.CountDownLatch;
 
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.StaticService;
 
@@ -122,22 +123,12 @@ public interface AsyncProcessorAwaitManager extends StaticService {
     }
 
     /**
-     * Registers the exchange to await for the callback to be triggered by another thread which has taken over processing
-     * this exchange. The current thread will await until that callback happens in the future (blocking until this happens).
+     * Process the given exchange sychronously.
      *
-     * @param exchange   the exchange
-     * @param latch      the latch used to wait for other thread to signal when its done
+     * @param processor the async processor to call
+     * @param exchange the exchange to process
      */
-    void await(Exchange exchange, CountDownLatch latch);
-
-    /**
-     * Triggered when the other thread is done processing the exchange, to signal to the waiting thread is done, and can take
-     * over control to further process the exchange.
-     *
-     * @param exchange   the exchange
-     * @param latch      the latch used to wait for other thread to signal when its done
-     */
-    void countDown(Exchange exchange, CountDownLatch latch);
+    void process(final AsyncProcessor processor, final Exchange exchange);
 
     /**
      * Number of threads that are blocked waiting for other threads to trigger the callback when they are done processing
diff --git a/camel-core/pom.xml b/camel-core/pom.xml
index 51c6e83..6b08aaf 100644
--- a/camel-core/pom.xml
+++ b/camel-core/pom.xml
@@ -192,6 +192,11 @@
 
     <!-- testing -->
     <dependency>
+      <groupId>org.codehaus.woodstox</groupId>
+      <artifactId>woodstox-core-asl</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
diff --git a/camel-core/src/main/docs/eips/delay-eip.adoc b/camel-core/src/main/docs/eips/delay-eip.adoc
index 00684a0..b1a3f4b 100644
--- a/camel-core/src/main/docs/eips/delay-eip.adoc
+++ b/camel-core/src/main/docs/eips/delay-eip.adoc
@@ -11,7 +11,7 @@ The Delay EIP supports 3 options which are listed below:
 |===
 | Name | Description | Default | Type
 | *executorServiceRef* | Refers to a custom Thread Pool if asyncDelay has been enabled. |  | String
-| *asyncDelayed* | Enables asynchronous delay which means the thread will not block while delaying. | false | Boolean
+| *asyncDelayed* | Enables asynchronous delay which means the thread will not block while delaying. | true | Boolean
 | *callerRunsWhenRejected* | Whether or not the caller should run the task when it was rejected by the thread pool. Is by default true | true | Boolean
 |===
 // eip options: END
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java b/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
index 65aebda..446b6ef 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
@@ -22,6 +22,7 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -48,11 +49,8 @@ public class FileConsumer extends GenericFileConsumer<File> {
         this.endpointPath = endpoint.getConfiguration().getDirectory();
 
         if (endpoint.getExtendedAttributes() != null) {
-            this.extendedAttributes = new HashSet<>();
-
-            for (String attribute : endpoint.getExtendedAttributes().split(",")) {
-                extendedAttributes.add(attribute);
-            }
+            List<String> attributes = Arrays.asList(endpoint.getExtendedAttributes().split(","));
+            this.extendedAttributes = new HashSet<>(attributes);
         }
     }
 
@@ -87,7 +85,7 @@ public class FileConsumer extends GenericFileConsumer<File> {
         }
         List<File> files = Arrays.asList(dirFiles);
         if (getEndpoint().isPreSort()) {
-            Collections.sort(files, (a, b) -> a.getAbsoluteFile().compareTo(a.getAbsoluteFile()));
+            files.sort(Comparator.comparing(File::getAbsoluteFile));
         }
 
         for (File file : dirFiles) {
@@ -99,7 +97,7 @@ public class FileConsumer extends GenericFileConsumer<File> {
             // trace log as Windows/Unix can have different views what the file is?
             if (log.isTraceEnabled()) {
                 log.trace("Found file: {} [isAbsolute: {}, isDirectory: {}, isFile: {}, isHidden: {}]",
-                        new Object[]{file, file.isAbsolute(), file.isDirectory(), file.isFile(), file.isHidden()});
+                        file, file.isAbsolute(), file.isDirectory(), file.isFile(), file.isHidden());
             }
 
             // creates a generic file
@@ -205,13 +203,12 @@ public class FileConsumer extends GenericFileConsumer<File> {
 
         // compute the file path as relative to the starting directory
         File path;
-        String endpointNormalized = FileUtil.normalizePath(endpointPath);
-        if (file.getPath().startsWith(endpointNormalized + File.separator)) {
-            // skip duplicate endpoint path
-            path = new File(StringHelper.after(file.getPath(), endpointNormalized + File.separator));
-        } else {
-            path = new File(file.getPath());
+        String endpointNormalizedSep = FileUtil.normalizePath(endpointPath) + File.separator;
+        String p = file.getPath();
+        if (p.startsWith(endpointNormalizedSep)) {
+            p = p.substring(endpointNormalizedSep.length());
         }
+        path = new File(p);
 
         if (path.getParent() != null) {
             answer.setRelativeFilePath(path.getParent() + File.separator + file.getName());
diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
index 38a059b..2bc5b0c 100644
--- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
@@ -99,7 +99,6 @@ public class SedaProducer extends DefaultAsyncProducer {
                 }
             });
 
-            log.trace("Adding Exchange to queue: {}", copy);
             try {
                 // do not copy as we already did the copy
                 addToQueue(copy, false);
diff --git a/camel-core/src/main/java/org/apache/camel/component/timer/TimerConsumer.java b/camel-core/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
index eab3bb7..1c81cb0 100644
--- a/camel-core/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
@@ -29,6 +29,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.StartupListener;
 import org.apache.camel.Suspendable;
 import org.apache.camel.support.DefaultConsumer;
+import org.apache.camel.support.ReactiveHelper;
 
 /**
  * The timer consumer.
@@ -174,6 +175,10 @@ public class TimerConsumer extends DefaultConsumer implements StartupListener, S
     }
 
     protected void sendTimerExchange(long counter) {
+        ReactiveHelper.schedule(() -> doSendTimerExchange(counter), "Send timer exchange");
+    }
+
+    protected void doSendTimerExchange(long counter) {
         final Exchange exchange = endpoint.createExchange();
         exchange.setProperty(Exchange.TIMER_COUNTER, counter);
         exchange.setProperty(Exchange.TIMER_NAME, endpoint.getTimerName());
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
index d78e852..774ccd7 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
@@ -23,8 +23,11 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.MessageHistory;
 import org.apache.camel.NamedNode;
@@ -33,6 +36,7 @@ import org.apache.camel.processor.DefaultExchangeFormatter;
 import org.apache.camel.spi.AsyncProcessorAwaitManager;
 import org.apache.camel.spi.ExchangeFormatter;
 import org.apache.camel.support.MessageHelper;
+import org.apache.camel.support.ReactiveHelper;
 import org.apache.camel.support.ServiceSupport;
 
 public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements AsyncProcessorAwaitManager {
@@ -59,8 +63,33 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
         this.exchangeFormatter = formatter;
     }
 
-    @Override
+    /**
+     * Calls the async version of the processor's process method and waits
+     * for it to complete before returning. This can be used by {@link AsyncProcessor}
+     * objects to implement their sync version of the process method.
+     * <p/>
+     * <b>Important:</b> This method is discouraged to be used, as its better to invoke the asynchronous
+     * {@link AsyncProcessor#process(org.apache.camel.Exchange, org.apache.camel.AsyncCallback)} method, whenever possible.
+     *
+     * @param processor the processor
+     * @param exchange  the exchange
+     * @throws Exception can be thrown if waiting is interrupted
+     */
+    public void process(final AsyncProcessor processor, final Exchange exchange) {
+        CountDownLatch latch = new CountDownLatch(1);
+        processor.process(exchange, doneSync -> countDown(exchange, latch));
+        if (latch.getCount() > 0) {
+            await(exchange, latch);
+        }
+    }
+
     public void await(Exchange exchange, CountDownLatch latch) {
+        // Early exit for pending reactive queued work
+        do {
+            if (latch.getCount() <= 0) {
+                return;
+            }
+        } while (ReactiveHelper.executeFromQueue());
         log.trace("Waiting for asynchronous callback before continuing for exchangeId: {} -> {}",
                 exchange.getExchangeId(), exchange);
         try {
@@ -98,7 +127,6 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
         }
     }
 
-    @Override
     public void countDown(Exchange exchange, CountDownLatch latch) {
         log.trace("Asynchronous callback received for exchangeId: {}", exchange.getExchangeId());
         latch.countDown();
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerCache.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerCache.java
index 5638f9d..454215e 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerCache.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerCache.java
@@ -298,17 +298,19 @@ public class DefaultProducerCache extends ServiceSupport implements ProducerCach
         }
 
         try {
-            StopWatch sw = null;
+            // record timing for sending the exchange using the producer
+            StopWatch watch;
             if (eventNotifierEnabled && exchange != null) {
                 boolean sending = EventHelper.notifyExchangeSending(exchange.getContext(), exchange, endpoint);
                 if (sending) {
-                    sw = new StopWatch();
+                    watch = new StopWatch();
+                } else {
+                    watch = null;
                 }
+            } else {
+                watch = null;
             }
 
-            // record timing for sending the exchange using the producer
-            final StopWatch watch = sw;
-
             // invoke the callback
             return producerCallback.doInAsyncProducer(producer, exchange, doneSync -> {
                 try {
diff --git a/camel-core/src/main/java/org/apache/camel/model/DelayDefinition.java b/camel-core/src/main/java/org/apache/camel/model/DelayDefinition.java
index 40cebdc..8d6c7f5 100644
--- a/camel-core/src/main/java/org/apache/camel/model/DelayDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/DelayDefinition.java
@@ -41,7 +41,7 @@ public class DelayDefinition extends NoOutputExpressionNode implements ExecutorS
     private ExecutorService executorService;
     @XmlAttribute
     private String executorServiceRef;
-    @XmlAttribute @Metadata(defaultValue = "false")
+    @XmlAttribute @Metadata(defaultValue = "true")
     private Boolean asyncDelayed;
     @XmlAttribute @Metadata(defaultValue = "true")
     private Boolean callerRunsWhenRejected;
@@ -68,7 +68,7 @@ public class DelayDefinition extends NoOutputExpressionNode implements ExecutorS
         return "Delay[" + getExpression() + " -> " + getOutputs() + "]";
     }
 
-    // Fluent API
+   // Fluent API
     // -------------------------------------------------------------------------
 
     /**
@@ -104,6 +104,14 @@ public class DelayDefinition extends NoOutputExpressionNode implements ExecutorS
     }
 
     /**
+     * Enables asynchronous delay which means the thread will <b>not</b> block while delaying.
+     */
+    public DelayDefinition syncDelayed() {
+        setAsyncDelayed(false);
+        return this;
+    }
+
+    /**
      * To use a custom Thread Pool if asyncDelay has been enabled.
      */
     public DelayDefinition executorService(ExecutorService executorService) {
diff --git a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
index d2e40ca..ce03628 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
@@ -46,6 +46,7 @@ import org.apache.camel.spi.Transformer;
 import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.support.MessageHelper;
 import org.apache.camel.support.OrderedComparator;
+import org.apache.camel.support.ReactiveHelper;
 import org.apache.camel.support.UnitOfWorkHelper;
 import org.apache.camel.util.StopWatch;
 import org.slf4j.Logger;
@@ -117,7 +118,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
     }
 
     @Override
-    public boolean process(Exchange exchange, AsyncCallback callback) {
+    public boolean process(Exchange exchange, AsyncCallback ocallback) {
         // ----------------------------------------------------------
         // CAMEL END USER - READ ME FOR DEBUGGING TIPS
         // ----------------------------------------------------------
@@ -134,7 +135,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
 
         if (processor == null || !continueProcessing(exchange)) {
             // no processor or we should not continue then we are done
-            callback.done(true);
+            ocallback.done(true);
             return true;
         }
 
@@ -148,17 +149,37 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
                 states[i] = state;
             } catch (Throwable e) {
                 exchange.setException(e);
-                callback.done(true);
+                ocallback.done(true);
                 return true;
             }
         }
 
         // create internal callback which will execute the advices in reverse order when done
-        callback = new InternalCallback(states, exchange, callback);
+        AsyncCallback callback = doneSync -> {
+            try {
+                for (int i = advices.size() - 1; i >= 0; i--) {
+                    CamelInternalProcessorAdvice task = advices.get(i);
+                    Object state = states[i];
+                    try {
+                        task.after(exchange, state);
+                    } catch (Throwable e) {
+                        exchange.setException(e);
+                        // allow all advices to complete even if there was an exception
+                    }
+                }
+            } finally {
+                // ----------------------------------------------------------
+                // CAMEL END USER - DEBUG ME HERE +++ START +++
+                // ----------------------------------------------------------
+                // callback must be called
+                ReactiveHelper.callback(ocallback);
+                // ----------------------------------------------------------
+                // CAMEL END USER - DEBUG ME HERE +++ END +++
+                // ----------------------------------------------------------
+            }
+        };
 
-        // UNIT_OF_WORK_PROCESS_SYNC is @deprecated and we should remove it from Camel 3.0
-        Object synchronous = exchange.removeProperty(Exchange.UNIT_OF_WORK_PROCESS_SYNC);
-        if (exchange.isTransacted() || synchronous != null) {
+        if (exchange.isTransacted()) {
             // must be synchronized for transacted exchanges
             if (log.isTraceEnabled()) {
                 if (exchange.isTransacted()) {
@@ -196,21 +217,23 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
             if (log.isTraceEnabled()) {
                 log.trace("Processing exchange for exchangeId: {} -> {}", exchange.getExchangeId(), exchange);
             }
-            boolean sync = processor.process(exchange, async);
+            processor.process(exchange, async);
             // ----------------------------------------------------------
             // CAMEL END USER - DEBUG ME HERE +++ END +++
             // ----------------------------------------------------------
 
-            // execute any after processor work (in current thread, not in the callback)
-            if (uow != null) {
-                uow.afterProcess(processor, exchange, callback, sync);
-            }
+            ReactiveHelper.scheduleLast(() -> {
+                // execute any after processor work (in current thread, not in the callback)
+                if (uow != null) {
+                    uow.afterProcess(processor, exchange, callback, false);
+                }
 
-            if (log.isTraceEnabled()) {
-                log.trace("Exchange processed and is continued routed {} for exchangeId: {} -> {}",
-                        new Object[]{sync ? "synchronously" : "asynchronously", exchange.getExchangeId(), exchange});
-            }
-            return sync;
+                if (log.isTraceEnabled()) {
+                    log.trace("Exchange processed and is continued routed asynchronously for exchangeId: {} -> {}",
+                             exchange.getExchangeId(), exchange);
+                }
+            }, "CamelInternalProcessor - UnitOfWork - afterProcess - " + processor + " - " + exchange.getExchangeId());
+            return false;
         }
     }
 
@@ -220,52 +243,6 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
     }
 
     /**
-     * Internal callback that executes the after advices.
-     */
-    private final class InternalCallback implements AsyncCallback {
-
-        private final Object[] states;
-        private final Exchange exchange;
-        private final AsyncCallback callback;
-
-        private InternalCallback(Object[] states, Exchange exchange, AsyncCallback callback) {
-            this.states = states;
-            this.exchange = exchange;
-            this.callback = callback;
-        }
-
-        @Override
-        @SuppressWarnings("unchecked")
-        public void done(boolean doneSync) {
-            // NOTE: if you are debugging Camel routes, then all the code in the for loop below is internal only
-            // so you can step straight to the finally block and invoke the callback
-
-            // we should call after in reverse order
-            try {
-                for (int i = advices.size() - 1; i >= 0; i--) {
-                    CamelInternalProcessorAdvice task = advices.get(i);
-                    Object state = states[i];
-                    try {
-                        task.after(exchange, state);
-                    } catch (Throwable e) {
-                        exchange.setException(e);
-                        // allow all advices to complete even if there was an exception
-                    }
-                }
-            } finally {
-                // ----------------------------------------------------------
-                // CAMEL END USER - DEBUG ME HERE +++ START +++
-                // ----------------------------------------------------------
-                // callback must be called
-                callback.done(doneSync);
-                // ----------------------------------------------------------
-                // CAMEL END USER - DEBUG ME HERE +++ END +++
-                // ----------------------------------------------------------
-            }
-        }
-    }
-
-    /**
      * Strategy to determine if we should continue processing the {@link Exchange}.
      */
     protected boolean continueProcessing(Exchange exchange) {
diff --git a/camel-core/src/main/java/org/apache/camel/processor/ConvertBodyProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/ConvertBodyProcessor.java
index bd1b350..70b717d 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/ConvertBodyProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/ConvertBodyProcessor.java
@@ -64,25 +64,18 @@ public class ConvertBodyProcessor extends ServiceSupport implements AsyncProcess
     }
 
     public void process(Exchange exchange) throws Exception {
-        AsyncProcessorHelper.process(this, exchange);
-    }
-
-    @Override
-    public boolean process(Exchange exchange, AsyncCallback callback) {
         boolean out = exchange.hasOut();
         Message old = out ? exchange.getOut() : exchange.getIn();
 
         if (old.getBody() == null) {
             // only convert if there is a body
-            callback.done(true);
-            return true;
+            return;
         }
 
         if (exchange.getException() != null) {
             // do not convert if an exception has been thrown as if we attempt to convert and it also fails with a new
             // exception then it will override the existing exception
-            callback.done(true);
-            return true;
+            return;
         }
 
         String originalCharsetName = null;
@@ -93,14 +86,7 @@ public class ConvertBodyProcessor extends ServiceSupport implements AsyncProcess
             exchange.setProperty(Exchange.CHARSET_NAME, charset);
         }
         // use mandatory conversion
-        Object value;
-        try {
-            value = old.getMandatoryBody(type);
-        } catch (Throwable e) {
-            exchange.setException(e);
-            callback.done(true);
-            return true;
-        }
+        Object value = old.getMandatoryBody(type);
 
         // create a new message container so we do not drag specialized message objects along
         // but that is only needed if the old message is a specialized message
@@ -126,7 +112,15 @@ public class ConvertBodyProcessor extends ServiceSupport implements AsyncProcess
                 exchange.removeProperty(Exchange.CHARSET_NAME);
             }
         }
+    }
 
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        try {
+            process(exchange);
+        } catch (Exception e) {
+            exchange.setException(e);
+        }
         callback.done(true);
         return true;
     }
diff --git a/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java b/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
index 5202fce..1bba746 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
@@ -25,6 +25,8 @@ import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.ShutdownRunningTask;
+import org.apache.camel.spi.ShutdownAware;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,12 +37,12 @@ import org.slf4j.LoggerFactory;
  * <p/>
  * This implementation will block while waiting.
  */
-public abstract class DelayProcessorSupport extends DelegateAsyncProcessor {
+public abstract class DelayProcessorSupport extends DelegateAsyncProcessor implements ShutdownAware {
 
     private final CamelContext camelContext;
     private final ScheduledExecutorService executorService;
     private final boolean shutdownExecutorService;
-    private boolean asyncDelayed;
+    private boolean asyncDelayed = true;
     private boolean callerRunsWhenRejected = true;
     private final AtomicInteger delayedCount = new AtomicInteger(0);
 
@@ -247,6 +249,8 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor {
     protected void doStart() throws Exception {
         if (isAsyncDelayed()) {
             ObjectHelper.notNull(executorService, "executorService", this);
+        } else if (executorService != null) {
+            asyncDelayed = true;
         }
         super.doStart();
     }
@@ -258,4 +262,19 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor {
         }
         super.doShutdown();
     }
+
+    @Override
+    public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
+        return true;
+    }
+
+    @Override
+    public int getPendingExchangesSize() {
+        return getDelayedCount();
+    }
+
+    @Override
+    public void prepareShutdown(boolean suspendOnly, boolean forced) {
+
+    }
 }
diff --git a/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
index 29594d4..8f3d0f4 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
@@ -18,7 +18,6 @@ package org.apache.camel.processor;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.CountDownLatch;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
@@ -86,22 +85,10 @@ public class DelegateAsyncProcessor extends ServiceSupport implements DelegatePr
         ServiceHelper.stopAndShutdownServices(processor);
     }
 
+    @Override
     public void process(Exchange exchange) throws Exception {
-        // inline org.apache.camel.support.AsyncProcessorHelper.process(org.apache.camel.AsyncProcessor, org.apache.camel.Exchange)
-        // to optimize and reduce stacktrace lengths
         final AsyncProcessorAwaitManager awaitManager = exchange.getContext().getAsyncProcessorAwaitManager();
-        final CountDownLatch latch = new CountDownLatch(1);
-        // call the asynchronous method and wait for it to be done
-        boolean sync = process(exchange, new AsyncCallback() {
-            public void done(boolean doneSync) {
-                if (!doneSync) {
-                    awaitManager.countDown(exchange, latch);
-                }
-            }
-        });
-        if (!sync) {
-            awaitManager.await(exchange, latch);
-        }
+        awaitManager.process(this, exchange);
     }
 
     public boolean process(final Exchange exchange, final AsyncCallback callback) {
diff --git a/camel-core/src/main/java/org/apache/camel/processor/DelegateSyncProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/DelegateSyncProcessor.java
index 653a736..963eec0 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/DelegateSyncProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/DelegateSyncProcessor.java
@@ -64,9 +64,9 @@ public class DelegateSyncProcessor extends ServiceSupport implements org.apache.
             exchange.setException(e);
         } finally {
             // we are bridging a sync processor as async so callback with true
-            callback.done(true);
+            callback.done(false);
         }
-        return true;
+        return false;
     }
 
     @Override
diff --git a/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java
index 6dc8888..92614a0 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java
@@ -16,17 +16,16 @@
  */
 package org.apache.camel.processor;
 
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
+import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.Traceable;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.support.ExchangeHelper;
+import org.apache.camel.support.ReactiveHelper;
 
 import static org.apache.camel.processor.PipelineHelper.continueProcessing;
 
@@ -49,133 +48,84 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
 
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
-        // use atomic integer to be able to pass reference and keep track on the values
-        AtomicInteger index = new AtomicInteger();
-        AtomicInteger count = new AtomicInteger();
-        AtomicBoolean doWhile = new AtomicBoolean();
-
         try {
-            if (expression != null) {
-                // Intermediate conversion to String is needed when direct conversion to Integer is not available
-                // but evaluation result is a textual representation of a numeric value.
-                String text = expression.evaluate(exchange, String.class);
-                int num = ExchangeHelper.convertToMandatoryType(exchange, Integer.class, text);
-                count.set(num);
-            } else {
-                boolean result = predicate.matches(exchange);
-                doWhile.set(result);
-            }
+
+            LoopState state = new LoopState(exchange, callback);
+
+            ReactiveHelper.scheduleMain(state);
+            return false;
+
         } catch (Exception e) {
             exchange.setException(e);
             callback.done(true);
             return true;
         }
+    }
 
-        // we hold on to the original Exchange in case it's needed for copies
-        final Exchange original = exchange;
-        
-        // per-iteration exchange
-        Exchange target = exchange;
-
-        // set the size before we start
-        if (predicate == null) {
-            exchange.setProperty(Exchange.LOOP_SIZE, count);
-        }
+    /**
+     * Class holding state for loop processing
+     */
+    class LoopState implements Runnable {
 
-        // loop synchronously
-        while ((predicate != null && doWhile.get()) || (index.get() < count.get())) {
-
-            // and prepare for next iteration
-            // if (!copy) target = exchange; else copy of original
-            target = prepareExchange(exchange, index.get(), original);
-            // the following process method will in the done method re-evaluate the predicate
-            // so we do not need to do it here as well
-            boolean sync = process(target, callback, index, count, doWhile, original);
-
-            if (!sync) {
-                log.trace("Processing exchangeId: {} is continued being processed asynchronously", target.getExchangeId());
-                // the remainder of the loop will be completed async
-                // so we break out now, then the callback will be invoked which then continue routing from where we left here
-                return false;
-            }
+        final Exchange exchange;
+        final AsyncCallback callback;
+        Exchange current;
+        int index;
+        int count;
 
-            log.trace("Processing exchangeId: {} is continued being processed synchronously", target.getExchangeId());
+        public LoopState(Exchange exchange, AsyncCallback callback) throws NoTypeConversionAvailableException {
+            this.exchange = exchange;
+            this.callback = callback;
+            this.current = exchange;
 
-            // check for error if so we should break out
-            if (!continueProcessing(target, "so breaking out of loop", log)) {
-                break;
+            // evaluate expression / predicate
+            if (expression != null) {
+                // Intermediate conversion to String is needed when direct conversion to Integer is not available
+                // but evaluation result is a textual representation of a numeric value.
+                String text = expression.evaluate(exchange, String.class);
+                count = ExchangeHelper.convertToMandatoryType(exchange, Integer.class, text);
+                exchange.setProperty(Exchange.LOOP_SIZE, count);
             }
         }
 
-        // we are done so prepare the result
-        ExchangeHelper.copyResults(exchange, target);
-        log.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
-        callback.done(true);
-        return true;
-    }
-
-    protected boolean process(final Exchange exchange, final AsyncCallback callback,
-                              final AtomicInteger index, final AtomicInteger count, final AtomicBoolean doWhile,
-                              final Exchange original) {
-
-        // set current index as property
-        log.debug("LoopProcessor: iteration #{}", index.get());
-        exchange.setProperty(Exchange.LOOP_INDEX, index.get());
-
-        boolean sync = processor.process(exchange, new AsyncCallback() {
-            public void done(boolean doneSync) {
-                // increment counter after done
-                index.getAndIncrement();
-
-                // evaluate predicate for next loop
-                if (predicate != null && index.get() > 0) {
-                    try {
-                        boolean result = predicate.matches(exchange);
-                        doWhile.set(result);
-                    } catch (Exception e) {
-                        // break out looping due that exception
-                        exchange.setException(e);
-                        doWhile.set(false);
-                    }
-                }
-
-                // we only have to handle async completion of the loop
-                // (as the sync is done in the outer processor)
-                if (doneSync) {
-                    return;
-                }
-
-                Exchange target = exchange;
-
-                // continue looping asynchronously
-                while ((predicate != null && doWhile.get()) || (index.get() < count.get())) {
-
-                    // check for error if so we should break out
-                    if (!continueProcessing(target, "so breaking out of loop", log)) {
-                        break;
-                    }
+        @Override
+        public void run() {
+            try {
+                // check for error if so we should break out
+                boolean cont = continueProcessing(current, "so breaking out of loop", log);
+                boolean doWhile = predicate == null || predicate.matches(current);
+                boolean doLoop = expression == null || index < count;
 
+                // iterate
+                if (cont && doWhile && doLoop) {
                     // and prepare for next iteration
-                    target = prepareExchange(exchange, index.get(), original);
-
-                    // process again
-                    boolean sync = process(target, callback, index, count, doWhile, original);
-                    if (!sync) {
-                        log.trace("Processing exchangeId: {} is continued being processed asynchronously", target.getExchangeId());
-                        // the remainder of the routing slip will be completed async
-                        // so we break out now, then the callback will be invoked which then continue routing from where we left here
-                        return;
-                    }
+                    current = prepareExchange(exchange, index);
+
+                    // set current index as property
+                    log.debug("LoopProcessor: iteration #{}", index);
+                    current.setProperty(Exchange.LOOP_INDEX, index);
+
+                    processor.process(current, doneSync -> {
+                        // increment counter after done
+                        index++;
+                        ReactiveHelper.schedule(this);
+                    });
+                } else {
+                    // we are done so prepare the result
+                    ExchangeHelper.copyResults(exchange, current);
+                    log.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
+                    callback.done(false);
                 }
-
-                // we are done so prepare the result
-                ExchangeHelper.copyResults(original, target);
-                log.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
+            } catch (Exception e) {
+                log.trace("Processing failed for exchangeId: {} >>> {}", exchange.getExchangeId(), e.getMessage());
+                exchange.setException(e);
                 callback.done(false);
             }
-        });
+        }
 
-        return sync;
+        public String toString() {
+            return "LoopState[" + exchange.getExchangeId() + "]";
+        }
     }
 
     /**
@@ -185,11 +135,11 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
      * @param index the index of the next iteration
      * @return the exchange to use
      */
-    protected Exchange prepareExchange(Exchange exchange, int index, Exchange original) {
+    protected Exchange prepareExchange(Exchange exchange, int index) {
         if (copy) {
             // use a copy but let it reuse the same exchange id so it appear as one exchange
             // use the original exchange rather than the looping exchange (esp. with the async routing engine)
-            return ExchangeHelper.createCopy(original, true);
+            return ExchangeHelper.createCopy(exchange, true);
         } else {
             ExchangeHelper.prepareOutToIn(exchange);
             return exchange;
diff --git a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 731947f..5e6c991 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -22,19 +22,17 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletionService;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.camel.AggregationStrategy;
 import org.apache.camel.AsyncCallback;
@@ -59,14 +57,15 @@ import org.apache.camel.support.AsyncProcessorConverterHelper;
 import org.apache.camel.support.AsyncProcessorHelper;
 import org.apache.camel.support.EventHelper;
 import org.apache.camel.support.ExchangeHelper;
+import org.apache.camel.support.ReactiveHelper;
 import org.apache.camel.support.ServiceHelper;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.FilterIterator;
 import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.KeyValueHolder;
-import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.StopWatch;
-import org.apache.camel.util.concurrent.SubmitOrderedCompletionService;
+import org.apache.camel.util.concurrent.AsyncCompletionService;
 
 import static org.apache.camel.util.ObjectHelper.notNull;
 
@@ -215,615 +214,247 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
     }
 
     public boolean process(Exchange exchange, AsyncCallback callback) {
-        final AtomicReference<Exchange> result = new AtomicReference<>();
-        Iterable<ProcessorExchangePair> pairs = null;
-
+        Iterable<ProcessorExchangePair> pairs;
         try {
-            boolean sync = true;
-
             pairs = createProcessorExchangePairs(exchange);
-
-            if (isParallelProcessing()) {
-                // ensure an executor is set when running in parallel
-                ObjectHelper.notNull(executorService, "executorService", this);
-                doProcessParallel(exchange, result, pairs, isStreaming(), callback);
-            } else {
-                sync = doProcessSequential(exchange, result, pairs, callback);
-            }
-
-            if (!sync) {
-                // the remainder of the multicast will be completed async
-                // so we break out now, then the callback will be invoked which then continue routing from where we left here
-                return false;
-            }
         } catch (Throwable e) {
             exchange.setException(e);
             // unexpected exception was thrown, maybe from iterator etc. so do not regard as exhausted
             // and do the done work
-            doDone(exchange, null, pairs, callback, true, false);
+            doDone(exchange, null, null, callback, true, false);
             return true;
         }
 
-        // multicasting was processed successfully
-        // and do the done work
-        Exchange subExchange = result.get() != null ? result.get() : null;
-        doDone(exchange, subExchange, pairs, callback, true, true);
-        return true;
-    }
-
-    protected void doProcessParallel(final Exchange original, final AtomicReference<Exchange> result, final Iterable<ProcessorExchangePair> pairs,
-                                     final boolean streaming, final AsyncCallback callback) throws Exception {
-
-        ObjectHelper.notNull(executorService, "ExecutorService", this);
-        ObjectHelper.notNull(aggregateExecutorService, "AggregateExecutorService", this);
-
-        final CompletionService<Exchange> completion;
-        if (streaming) {
-            // execute tasks in parallel+streaming and aggregate in the order they are finished (out of order sequence)
-            completion = new ExecutorCompletionService<>(executorService);
+        MulticastState state = new MulticastState(exchange, pairs, callback);
+        if (isParallelProcessing()) {
+            executorService.submit(() -> ReactiveHelper.schedule(state));
         } else {
-            // execute tasks in parallel and aggregate in the order the tasks are submitted (in order sequence)
-            completion = new SubmitOrderedCompletionService<>(executorService);
-        }
-
-        final AtomicInteger total = new AtomicInteger(0);
-        final Iterator<ProcessorExchangePair> it = pairs.iterator();
-
-        if (it.hasNext()) {
-            // when parallel then aggregate on the fly
-            final AtomicBoolean running = new AtomicBoolean(true);
-            final AtomicBoolean allTasksSubmitted = new AtomicBoolean();
-            final CountDownLatch aggregationOnTheFlyDone = new CountDownLatch(1);
-            final AtomicReference<Exception> executionException = new AtomicReference<>();
-
-            // issue task to execute in separate thread so it can aggregate on-the-fly
-            // while we submit new tasks, and those tasks complete concurrently
-            // this allows us to optimize work and reduce memory consumption
-            final AggregateOnTheFlyTask aggregateOnTheFlyTask = new AggregateOnTheFlyTask(result, original, total, completion, running,
-                    aggregationOnTheFlyDone, allTasksSubmitted, executionException);
-            final AtomicBoolean aggregationTaskSubmitted = new AtomicBoolean();
-
-            log.trace("Starting to submit parallel tasks");
-            
-            try {
-                while (it.hasNext()) {
-                    final ProcessorExchangePair pair = it.next();
-                    // in case the iterator returns null then continue to next
-                    if (pair == null) {
-                        continue;
-                    }
-    
-                    final Exchange subExchange = pair.getExchange();
-                    updateNewExchange(subExchange, total.intValue(), pairs, it);
-    
-                    completion.submit(new Callable<Exchange>() {
-                        public Exchange call() throws Exception {
-                            // start the aggregation task at this stage only in order not to pile up too many threads
-                            if (aggregationTaskSubmitted.compareAndSet(false, true)) {
-                                // but only submit the aggregation task once
-                                aggregateExecutorService.submit(aggregateOnTheFlyTask);
-                            }
-    
-                            if (!running.get()) {
-                                // do not start processing the task if we are not running
-                                return subExchange;
-                            }
-    
-                            try {
-                                doProcessParallel(pair);
-                            } catch (Throwable e) {
-                                subExchange.setException(e);
-                            }
-    
-                            // Decide whether to continue with the multicast or not; similar logic to the Pipeline
-                            Integer number = getExchangeIndex(subExchange);
-                            boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Parallel processing failed for number " + number, log);
-                            if (stopOnException && !continueProcessing) {
-                                // signal to stop running
-                                running.set(false);
-                                // throw caused exception
-                                if (subExchange.getException() != null) {
-                                    // wrap in exception to explain where it failed
-                                    CamelExchangeException cause = new CamelExchangeException("Parallel processing failed for number " + number, subExchange, subExchange.getException());
-                                    subExchange.setException(cause);
-                                }
-                            }
-    
-                            log.trace("Parallel processing complete for exchange: {}", subExchange);
-                            return subExchange;
-                        }
-                    });
-    
-                    total.incrementAndGet();
-                }
-            } catch (Throwable e) {
-                // The methods it.hasNext and it.next can throw RuntimeExceptions when custom iterators are implemented.
-                // We have to catch the exception here otherwise the aggregator threads would pile up.
-                if (e instanceof Exception) {
-                    executionException.set((Exception) e);
-                } else {
-                    executionException.set(RuntimeCamelException.wrapRuntimeCamelException(e));
-                }
-                // and because of the exception we must signal we are done so the latch can open and let the other thread continue processing
-                log.debug("Signaling we are done aggregating on the fly for exchangeId: {}", original.getExchangeId());
-                log.trace("Aggregate on the fly task done for exchangeId: {}", original.getExchangeId());
-                aggregationOnTheFlyDone.countDown();
-            }
-
-            // signal all tasks has been submitted
-            log.trace("Signaling that all {} tasks has been submitted.", total.get());
-            allTasksSubmitted.set(true);
-
-            // its to hard to do parallel async routing so we let the caller thread be synchronously
-            // and have it pickup the replies and do the aggregation (eg we use a latch to wait)
-            // wait for aggregation to be done
-            log.debug("Waiting for on-the-fly aggregation to complete aggregating {} responses for exchangeId: {}", total.get(), original.getExchangeId());
-            aggregationOnTheFlyDone.await();
-
-            // did we fail for whatever reason, if so throw that caused exception
-            if (executionException.get() != null) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Parallel processing failed due {}", executionException.get().getMessage());
-                }
-                throw executionException.get();
-            }
+            ReactiveHelper.scheduleMain(state);
         }
 
-        // no everything is okay so we are done
-        log.debug("Done parallel processing {} exchanges", total);
+        // the remainder of the multicast will be completed async
+        // so we break out now, then the callback will be invoked which then
+        // continue routing from where we left here
+        return false;
     }
 
-    /**
-     * Boss worker to control aggregate on-the-fly for completed tasks when using parallel processing.
-     * <p/>
-     * This ensures lower memory consumption as we do not need to keep all completed tasks in memory
-     * before we perform aggregation. Instead this separate thread will run and aggregate when new
-     * completed tasks is done.
-     * <p/>
-     * The logic is fairly complex as this implementation has to keep track how far it got, and also
-     * signal back to the <i>main</t> thread when its done, so the <i>main</t> thread can continue
-     * processing when the entire splitting is done.
-     */
-    private final class AggregateOnTheFlyTask implements Runnable {
-
-        private final AtomicReference<Exchange> result;
-        private final Exchange original;
-        private final AtomicInteger total;
-        private final CompletionService<Exchange> completion;
-        private final AtomicBoolean running;
-        private final CountDownLatch aggregationOnTheFlyDone;
-        private final AtomicBoolean allTasksSubmitted;
-        private final AtomicReference<Exception> executionException;
-
-        private AggregateOnTheFlyTask(AtomicReference<Exchange> result, Exchange original, AtomicInteger total,
-                                      CompletionService<Exchange> completion, AtomicBoolean running,
-                                      CountDownLatch aggregationOnTheFlyDone, AtomicBoolean allTasksSubmitted,
-                                      AtomicReference<Exception> executionException) {
-            this.result = result;
-            this.original = original;
-            this.total = total;
-            this.completion = completion;
-            this.running = running;
-            this.aggregationOnTheFlyDone = aggregationOnTheFlyDone;
-            this.allTasksSubmitted = allTasksSubmitted;
-            this.executionException = executionException;
-        }
-
-        public void run() {
-            log.trace("Aggregate on the fly task started for exchangeId: {}", original.getExchangeId());
-
-            try {
-                aggregateOnTheFly();
-            } catch (Throwable e) {
-                if (e instanceof Exception) {
-                    executionException.set((Exception) e);
-                } else {
-                    executionException.set(RuntimeCamelException.wrapRuntimeCamelException(e));
-                }
-            } finally {
-                // must signal we are done so the latch can open and let the other thread continue processing
-                log.debug("Signaling we are done aggregating on the fly for exchangeId: {}", original.getExchangeId());
-                log.trace("Aggregate on the fly task done for exchangeId: {}", original.getExchangeId());
-                aggregationOnTheFlyDone.countDown();
-            }
-        }
-
-        private void aggregateOnTheFly() throws InterruptedException, ExecutionException {
-            final AtomicBoolean timedOut = new AtomicBoolean();
-            boolean stoppedOnException = false;
-            final StopWatch watch = new StopWatch();
-            final AtomicInteger aggregated = new AtomicInteger();
-            boolean done = false;
-            // not a for loop as on the fly may still run
-            while (!done) {
-                // check if we have already aggregate everything
-                if (allTasksSubmitted.get() && aggregated.intValue() >= total.get()) {
-                    log.debug("Done aggregating {} exchanges on the fly.", aggregated);
-                    break;
-                }
-
-                Future<Exchange> future;
-                if (timedOut.get()) {
-                    // we are timed out but try to grab if some tasks has been completed
-                    // poll will return null if no tasks is present
-                    future = completion.poll();
-                    log.trace("Polled completion task #{} after timeout to grab already completed tasks: {}", aggregated, future);
-                } else if (timeout > 0) {
-                    long left = timeout - watch.taken();
-                    if (left < 0) {
-                        left = 0;
-                    }
-                    log.trace("Polling completion task #{} using timeout {} millis.", aggregated, left);
-                    future = completion.poll(left, TimeUnit.MILLISECONDS);
-                } else {
-                    log.trace("Polling completion task #{}", aggregated);
-                    // we must not block so poll every second
-                    future = completion.poll(1, TimeUnit.SECONDS);
-                    if (future == null) {
-                        // and continue loop which will recheck if we are done
-                        continue;
-                    }
-                }
-
-                if (future == null) {
-                    ParallelAggregateTimeoutTask task = new ParallelAggregateTimeoutTask(original, result, completion, aggregated, total, timedOut);
-                    if (parallelAggregate) {
-                        aggregateExecutorService.submit(task);
-                    } else {
-                        // in non parallel mode then just run the task
-                        task.run();
-                    }
-                } else {
-                    // there is a result to aggregate
-                    Exchange subExchange = future.get();
-
-                    // Decide whether to continue with the multicast or not; similar logic to the Pipeline
-                    Integer number = getExchangeIndex(subExchange);
-                    boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Parallel processing failed for number " + number, log);
-                    if (stopOnException && !continueProcessing) {
-                        // we want to stop on exception and an exception or failure occurred
-                        // this is similar to what the pipeline does, so we should do the same to not surprise end users
-                        // so we should set the failed exchange as the result and break out
-                        result.set(subExchange);
-                        stoppedOnException = true;
-                        break;
-                    }
-
-                    // we got a result so aggregate it
-                    ParallelAggregateTask task = new ParallelAggregateTask(result, subExchange, aggregated);
-                    if (parallelAggregate) {
-                        aggregateExecutorService.submit(task);
-                    } else {
-                        // in non parallel mode then just run the task
-                        task.run();
-                    }
-                }
-            }
-
-            if (timedOut.get() || stoppedOnException) {
-                if (timedOut.get()) {
-                    log.debug("Cancelling tasks due timeout after {} millis.", timeout);
-                }
-                if (stoppedOnException) {
-                    log.debug("Cancelling tasks due stopOnException.");
-                }
-                // cancel tasks as we timed out (its safe to cancel done tasks)
-                running.set(false);
-            }
+    protected void schedule(Runnable runnable) {
+        if (isParallelProcessing()) {
+            executorService.submit(() -> ReactiveHelper.schedule(runnable));
+        } else {
+            ReactiveHelper.scheduleLast(runnable, "Multicast next step");
         }
     }
 
-    /**
-     * Worker task to aggregate the old and new exchange on-the-fly for completed tasks when using parallel processing.
-     */
-    private final class ParallelAggregateTask implements Runnable {
-
-        private final AtomicReference<Exchange> result;
-        private final Exchange subExchange;
-        private final AtomicInteger aggregated;
+    protected class MulticastState implements Runnable {
 
-        private ParallelAggregateTask(AtomicReference<Exchange> result, Exchange subExchange, AtomicInteger aggregated) {
-            this.result = result;
-            this.subExchange = subExchange;
-            this.aggregated = aggregated;
-        }
+        final Exchange original;
+        final Iterable<ProcessorExchangePair> pairs;
+        final AsyncCallback callback;
+        final Iterator<ProcessorExchangePair> iterator;
+        final ReentrantLock lock;
+        final AsyncCompletionService<Exchange> completion;
+        final AtomicReference<Exchange> result;
+        final AtomicInteger nbExchangeSent = new AtomicInteger();
+        final AtomicInteger nbAggregated = new AtomicInteger();
+        final AtomicBoolean allSent = new AtomicBoolean();
+        final AtomicBoolean done = new AtomicBoolean();
 
-        @Override
-        public void run() {
-            try {
-                if (parallelAggregate) {
-                    doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange);
-                } else {
-                    doAggregate(getAggregationStrategy(subExchange), result, subExchange);
-                }
-            } catch (Throwable e) {
-                if (isStopOnAggregateException()) {
-                    throw e;
-                } else {
-                    // wrap in exception to explain where it failed
-                    CamelExchangeException cex = new CamelExchangeException("Parallel processing failed for number " + aggregated.get(), subExchange, e);
-                    subExchange.setException(cex);
-                    log.debug(cex.getMessage(), cex);
-                }
-            } finally {
-                aggregated.incrementAndGet();
+        MulticastState(Exchange original, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) {
+            this.original = original;
+            this.pairs = pairs;
+            this.callback = callback;
+            this.iterator = pairs.iterator();
+            this.lock = new ReentrantLock();
+            this.completion = new AsyncCompletionService<>(MulticastProcessor.this::schedule, !isStreaming(), lock);
+            this.result = new AtomicReference<>();
+            if (timeout > 0) {
+                schedule(aggregateExecutorService, this::timeout, timeout, TimeUnit.MILLISECONDS);
             }
         }
-    }
-
-    /**
-     * Worker task to aggregate the old and new exchange on-the-fly for completed tasks when using parallel processing.
-     */
-    private final class ParallelAggregateTimeoutTask implements Runnable {
 
-        private final Exchange original;
-        private final AtomicReference<Exchange> result;
-        private final CompletionService<Exchange> completion;
-        private final AtomicInteger aggregated;
-        private final AtomicInteger total;
-        private final AtomicBoolean timedOut;
-
-        private ParallelAggregateTimeoutTask(Exchange original, AtomicReference<Exchange> result, CompletionService<Exchange> completion,
-                                             AtomicInteger aggregated, AtomicInteger total, AtomicBoolean timedOut) {
-            this.original = original;
-            this.result = result;
-            this.completion = completion;
-            this.aggregated = aggregated;
-            this.total = total;
-            this.timedOut = timedOut;
+        public String toString() {
+            return "Step[" + original.getExchangeId() + "," + MulticastProcessor.this + "]";
         }
 
-        @Override
         public void run() {
-            AggregationStrategy strategy = getAggregationStrategy(null);
-            // notify the strategy we timed out
-            Exchange oldExchange = result.get();
-            if (oldExchange == null) {
-                // if they all timed out the result may not have been set yet, so use the original exchange
-                oldExchange = original;
-            }
-            strategy.timeout(oldExchange, aggregated.intValue(), total.intValue(), timeout);
-            log.debug("Timeout occurred after {} millis for number {} task.", timeout, aggregated.intValue());
-            timedOut.set(true);
-
-            // mark that index as timed out, which allows us to try to retrieve
-            // any already completed tasks in the next loop
-            if (completion instanceof SubmitOrderedCompletionService) {
-                ((SubmitOrderedCompletionService<?>) completion).timeoutTask();
-            }
-
-            // we timed out so increment the counter
-            aggregated.incrementAndGet();
-        }
-    }
-
-    protected boolean doProcessSequential(Exchange original, AtomicReference<Exchange> result, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) throws Exception {
-        AtomicInteger total = new AtomicInteger();
-        Iterator<ProcessorExchangePair> it = pairs.iterator();
-
-        while (it.hasNext()) {
-            ProcessorExchangePair pair = it.next();
-            // in case the iterator returns null then continue to next
-            if (pair == null) {
-                continue;
-            }
-            Exchange subExchange = pair.getExchange();
-            updateNewExchange(subExchange, total.get(), pairs, it);
-
-            boolean sync = doProcessSequential(original, result, pairs, it, pair, callback, total);
-            if (!sync) {
-                if (log.isTraceEnabled()) {
-                    log.trace("Processing exchangeId: {} is continued being processed asynchronously", pair.getExchange().getExchangeId());
-                }
-                // the remainder of the multicast will be completed async
-                // so we break out now, then the callback will be invoked which then continue routing from where we left here
-                return false;
-            }
-
-            if (log.isTraceEnabled()) {
-                log.trace("Processing exchangeId: {} is continued being processed synchronously", pair.getExchange().getExchangeId());
-            }
-
-            // Decide whether to continue with the multicast or not; similar logic to the Pipeline
-            // remember to test for stop on exception and aggregate before copying back results
-            boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Sequential processing failed for number " + total.get(), log);
-            if (stopOnException && !continueProcessing) {
-                if (subExchange.getException() != null) {
-                    // wrap in exception to explain where it failed
-                    CamelExchangeException cause = new CamelExchangeException("Sequential processing failed for number " + total.get(), subExchange, subExchange.getException());
-                    subExchange.setException(cause);
+            try {
+                if (done.get()) {
+                    return;
                 }
-                // we want to stop on exception, and the exception was handled by the error handler
-                // this is similar to what the pipeline does, so we should do the same to not surprise end users
-                // so we should set the failed exchange as the result and be done
-                result.set(subExchange);
-                return true;
-            }
-
-            log.trace("Sequential processing complete for number {} exchange: {}", total, subExchange);
-
-            if (parallelAggregate) {
-                doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange);
-            } else {
-                doAggregate(getAggregationStrategy(subExchange), result, subExchange);
-            }
-
-            total.incrementAndGet();
-        }
-
-        log.debug("Done sequential processing {} exchanges", total);
-
-        return true;
-    }
 
-    private boolean doProcessSequential(final Exchange original, final AtomicReference<Exchange> result,
-                                        final Iterable<ProcessorExchangePair> pairs, final Iterator<ProcessorExchangePair> it,
-                                        final ProcessorExchangePair pair, final AsyncCallback callback, final AtomicInteger total) {
-        boolean sync = true;
-
-        final Exchange exchange = pair.getExchange();
-        Processor processor = pair.getProcessor();
-        final Producer producer = pair.getProducer();
-
-        try {
-            StopWatch sw = null;
-            if (producer != null) {
-                boolean sending = EventHelper.notifyExchangeSending(exchange.getContext(), exchange, producer.getEndpoint());
-                if (sending) {
-                    sw = new StopWatch();
+                // Check if the iterator is empty
+                // This can only happen the very first time we check the existence
+                // of an item before queuing the run.
+                if (!iterator.hasNext()) {
+                    doDone(null, true);
+                    return;
                 }
-            }
-
-            // compute time taken if sending to another endpoint
-            final StopWatch watch = sw;
-
-            // let the prepared process it, remember to begin the exchange pair
-            AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor);
-            pair.begin();
-            sync = async.process(exchange, new AsyncCallback() {
-                public void done(boolean doneSync) {
-                    // we are done with the exchange pair
-                    pair.done();
-
-                    // okay we are done, so notify the exchange was sent
-                    if (producer != null && watch != null) {
-                        long timeTaken = watch.taken();
-                        Endpoint endpoint = producer.getEndpoint();
-                        // emit event that the exchange was sent to the endpoint
-                        EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken);
-                    }
 
-                    // we only have to handle async completion of the routing slip
-                    if (doneSync) {
-                        return;
-                    }
+                ProcessorExchangePair pair = iterator.next();
+                boolean hasNext = iterator.hasNext();
+                Exchange exchange = pair.getExchange();
 
-                    // continue processing the multicast asynchronously
-                    Exchange subExchange = exchange;
-
-                    // Decide whether to continue with the multicast or not; similar logic to the Pipeline
-                    // remember to test for stop on exception and aggregate before copying back results
-                    boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Sequential processing failed for number " + total.get(), log);
-                    if (stopOnException && !continueProcessing) {
-                        if (subExchange.getException() != null) {
-                            // wrap in exception to explain where it failed
-                            subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, subExchange.getException()));
-                        } else {
-                            // we want to stop on exception, and the exception was handled by the error handler
-                            // this is similar to what the pipeline does, so we should do the same to not surprise end users
-                            // so we should set the failed exchange as the result and be done
-                            result.set(subExchange);
-                        }
-                        // and do the done work
-                        doDone(original, subExchange, pairs, callback, false, true);
-                        return;
-                    }
+                int index = nbExchangeSent.getAndIncrement();
+                updateNewExchange(exchange, index, pairs, hasNext);
 
-                    try {
-                        if (parallelAggregate) {
-                            doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange);
-                        } else {
-                            doAggregate(getAggregationStrategy(subExchange), result, subExchange);
-                        }
-                    } catch (Throwable e) {
-                        original.setException(e);
-                        // and do the done work
-                        doDone(original, null, pairs, callback, false, true);
-                        return;
+                // Schedule the processing of the next pair
+                if (hasNext) {
+                    if (isParallelProcessing()) {
+                        schedule(this);
                     }
+                } else {
+                    allSent.set(true);
+                }
 
-                    total.incrementAndGet();
-
-                    // maybe there are more processors to multicast
-                    while (it.hasNext()) {
-
-                        // prepare and run the next
-                        ProcessorExchangePair pair = it.next();
-                        subExchange = pair.getExchange();
-                        updateNewExchange(subExchange, total.get(), pairs, it);
-                        boolean sync = doProcessSequential(original, result, pairs, it, pair, callback, total);
+                completion.submit(exchangeResult -> {
+                    // compute time taken if sending to another endpoint
+                    StopWatch watch = beforeSend(pair);
 
-                        if (!sync) {
-                            log.trace("Processing exchangeId: {} is continued being processed asynchronously", original.getExchangeId());
-                            return;
-                        }
+                    AsyncProcessor async = AsyncProcessorConverterHelper.convert(pair.getProcessor());
+                    async.process(exchange, doneSync -> {
+                        afterSend(pair, watch);
 
                         // Decide whether to continue with the multicast or not; similar logic to the Pipeline
                         // remember to test for stop on exception and aggregate before copying back results
-                        continueProcessing = PipelineHelper.continueProcessing(subExchange, "Sequential processing failed for number " + total.get(), log);
+                        boolean continueProcessing = PipelineHelper.continueProcessing(exchange, "Multicast processing failed for number " + index, log);
                         if (stopOnException && !continueProcessing) {
-                            if (subExchange.getException() != null) {
+                            if (exchange.getException() != null) {
                                 // wrap in exception to explain where it failed
-                                subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, subExchange.getException()));
+                                exchange.setException(new CamelExchangeException("Multicast processing failed for number " + index, exchange, exchange.getException()));
                             } else {
                                 // we want to stop on exception, and the exception was handled by the error handler
                                 // this is similar to what the pipeline does, so we should do the same to not surprise end users
                                 // so we should set the failed exchange as the result and be done
-                                result.set(subExchange);
+                                result.set(exchange);
                             }
                             // and do the done work
-                            doDone(original, subExchange, pairs, callback, false, true);
+                            doDone(exchange, true);
                             return;
                         }
 
-                        // must catch any exceptions from aggregation
-                        try {
-                            if (parallelAggregate) {
-                                doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange);
-                            } else {
-                                doAggregate(getAggregationStrategy(subExchange), result, subExchange);
-                            }
-                        } catch (Throwable e) {
-                            // wrap in exception to explain where it failed
-                            subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, e));
-                            // and do the done work
-                            doDone(original, subExchange, pairs, callback, false, true);
-                            return;
+                        exchangeResult.accept(exchange);
+
+                        // aggregate exchanges if any
+                        aggregate();
+
+                        // next step
+                        if (hasNext && !isParallelProcessing()) {
+                            schedule(this);
                         }
+                    });
+                });
+            } catch (Exception e) {
+                original.setException(e);
+                doDone(null, false);
+            }
+        }
 
-                        total.incrementAndGet();
+        protected void aggregate() {
+            Lock lock = this.lock;
+            if (lock.tryLock()) {
+                try {
+                    Exchange exchange;
+                    while (!done.get() && (exchange = completion.poll()) != null) {
+                        doAggregate(result, exchange);
+                        if (nbAggregated.incrementAndGet() >= nbExchangeSent.get() && allSent.get()) {
+                            doDone(result.get(), true);
+                        }
                     }
+                } catch (Throwable e) {
+                    original.setException(e);
+                    // and do the done work
+                    doDone(null, false);
+                } finally {
+                    lock.unlock();
+                }
+            }
+        }
 
-                    // do the done work
-                    subExchange = result.get() != null ? result.get() : null;
-                    doDone(original, subExchange, pairs, callback, false, true);
+        protected void timeout() {
+            Lock lock = this.lock;
+            if (lock.tryLock()) {
+                try {
+                    while (nbAggregated.get() < nbExchangeSent.get()) {
+                        Exchange exchange = completion.pollUnordered();
+                        int index = exchange != null ? getExchangeIndex(exchange) : nbExchangeSent.get();
+                        while (nbAggregated.get() < index) {
+                            AggregationStrategy strategy = getAggregationStrategy(null);
+                            strategy.timeout(result.get() != null ? result.get() : original,
+                                    nbAggregated.getAndIncrement(), nbExchangeSent.get(), timeout);
+                        }
+                        if (exchange != null) {
+                            doAggregate(result, exchange);
+                            nbAggregated.incrementAndGet();
+                        }
+                    }
+                    doDone(result.get(), true);
+                } catch (Throwable e) {
+                    original.setException(e);
+                    // and do the done work
+                    doDone(null, false);
+                } finally {
+                    lock.unlock();
                 }
-            });
-        } finally {
+            }
         }
 
-        return sync;
+        protected void doDone(Exchange exchange, boolean forceExhaust) {
+            if (done.compareAndSet(false, true)) {
+                MulticastProcessor.this.doDone(original, exchange, pairs, callback, false, forceExhaust);
+            }
+        }
     }
 
-    private void doProcessParallel(final ProcessorExchangePair pair) throws Exception {
-        final Exchange exchange = pair.getExchange();
-        Processor processor = pair.getProcessor();
-        Producer producer = pair.getProducer();
-
-        // compute time taken if sending to another endpoint
-        StopWatch watch = null;
-        try {
-            if (producer != null) {
-                boolean sending = EventHelper.notifyExchangeSending(exchange.getContext(), exchange, producer.getEndpoint());
-                if (sending) {
-                    watch = new StopWatch();
+    protected void schedule(Executor executor, Runnable runnable, long delay, TimeUnit unit) {
+        if (executor instanceof ScheduledExecutorService) {
+            ((ScheduledExecutorService) executor).schedule(runnable, delay, unit);
+        } else {
+            executor.execute(() -> {
+                try {
+                    Thread.sleep(delay);
+                } catch (InterruptedException e) {
+                    // ignore
                 }
+                runnable.run();
+            });
+        }
+    }
+
+    protected StopWatch beforeSend(ProcessorExchangePair pair) {
+        StopWatch watch;
+        final Exchange e = pair.getExchange();
+        final Producer p = pair.getProducer();
+        if (p != null) {
+            boolean sending = EventHelper.notifyExchangeSending(e.getContext(), e, p.getEndpoint());
+            if (sending) {
+                watch = new StopWatch();
+            } else {
+                watch = null;
             }
-            // let the prepared process it, remember to begin the exchange pair
-            AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor);
-            pair.begin();
-            // we invoke it synchronously as parallel async routing is too hard
-            AsyncProcessorHelper.process(async, exchange);
-        } finally {
-            pair.done();
-            if (producer != null && watch != null) {
-                Endpoint endpoint = producer.getEndpoint();
-                long timeTaken = watch.taken();
-                // emit event that the exchange was sent to the endpoint
-                // this is okay to do here in the finally block, as the processing is not using the async routing engine
-                //( we invoke it synchronously as parallel async routing is too hard)
-                EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken);
-            }
+        } else {
+            watch = null;
+        }
+
+        // let the prepared process it, remember to begin the exchange pair
+        pair.begin();
+
+        // return the watch
+        return watch;
+    }
+
+    protected void afterSend(ProcessorExchangePair pair, StopWatch watch) {
+        // we are done with the exchange pair
+        pair.done();
+
+        // okay we are done, so notify the exchange was sent
+        final Producer producer = pair.getProducer();
+        if (producer != null && watch != null) {
+            long timeTaken = watch.taken();
+            final Exchange e = pair.getExchange();
+            Endpoint endpoint = producer.getEndpoint();
+            // emit event that the exchange was sent to the endpoint
+            EventHelper.notifyExchangeSent(e.getContext(), e, endpoint, timeTaken);
         }
     }
 
@@ -886,11 +517,28 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
         if (exception) {
             // multicast uses error handling on its output processors and they have tried to redeliver
             // so we shall signal back to the other error handlers that we are exhausted and they should not
-            // also try to redeliver as we will then do that twice
+            // also try to redeliver as we would then do that twice
             original.setProperty(Exchange.REDELIVERY_EXHAUSTED, exhaust);
         }
 
-        callback.done(doneSync);
+        ReactiveHelper.callback(callback);
+    }
+
+    /**
+     * Aggregate the {@link Exchange} with the current result.
+     * This method is synchronized and is called directly when parallelAggregate is disabled (by default).
+     *
+     * @param result   the current result
+     * @param exchange the exchange to be added to the result
+     * @see #doAggregateInternal(AggregationStrategy, AtomicReference, org.apache.camel.Exchange)
+     * @see #doAggregateSync(AggregationStrategy, AtomicReference, org.apache.camel.Exchange)
+     */
+    protected void doAggregate(AtomicReference<Exchange> result, Exchange exchange) {
+        if (parallelAggregate) {
+            doAggregateInternal(getAggregationStrategy(exchange), result, exchange);
+        } else {
+            doAggregateSync(getAggregationStrategy(exchange), result, exchange);
+        }
     }
 
     /**
@@ -902,7 +550,7 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
      * @param exchange the exchange to be added to the result
      * @see #doAggregateInternal(AggregationStrategy, AtomicReference, org.apache.camel.Exchange)
      */
-    protected synchronized void doAggregate(AggregationStrategy strategy, AtomicReference<Exchange> result, Exchange exchange) {
+    protected synchronized void doAggregateSync(AggregationStrategy strategy, AtomicReference<Exchange> result, Exchange exchange) {
         doAggregateInternal(strategy, result, exchange);
     }
 
@@ -914,7 +562,7 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
      * @param strategy the aggregation strategy to use
      * @param result   the current result
      * @param exchange the exchange to be added to the result
-     * @see #doAggregate
+     * @see #doAggregateSync
      */
     protected void doAggregateInternal(AggregationStrategy strategy, AtomicReference<Exchange> result, Exchange exchange) {
         if (strategy != null) {
@@ -925,10 +573,9 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
         }
     }
 
-    protected void updateNewExchange(Exchange exchange, int index, Iterable<ProcessorExchangePair> allPairs,
-                                     Iterator<ProcessorExchangePair> it) {
+    protected void updateNewExchange(Exchange exchange, int index, Iterable<ProcessorExchangePair> allPairs, boolean hasNext) {
         exchange.setProperty(Exchange.MULTICAST_INDEX, index);
-        if (it.hasNext()) {
+        if (hasNext) {
             exchange.setProperty(Exchange.MULTICAST_COMPLETE, Boolean.FALSE);
         } else {
             exchange.setProperty(Exchange.MULTICAST_COMPLETE, Boolean.TRUE);
@@ -1119,10 +766,7 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
         if (isParallelProcessing() && executorService == null) {
             throw new IllegalArgumentException("ParallelProcessing is enabled but ExecutorService has not been set");
         }
-        if (timeout > 0 && !isParallelProcessing()) {
-            throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled");
-        }
-        if (isParallelProcessing() && aggregateExecutorService == null) {
+        if (aggregateExecutorService == null) {
             // use unbounded thread pool so we ensure the aggregate on-the-fly task always will have assigned a thread
             // and run the tasks when the task is submitted. If not then the aggregate task may not be able to run
             // and signal completion during processing, which would lead to what would appear as a dead-lock or a slow processing
@@ -1145,7 +789,7 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
      */
     protected synchronized ExecutorService createAggregateExecutorService(String name) {
         // use a cached thread pool so we each on-the-fly task has a dedicated thread to process completions as they come in
-        return camelContext.getExecutorServiceManager().newCachedThreadPool(this, name);
+        return camelContext.getExecutorServiceManager().newScheduledThreadPool(this, name, 0);
     }
 
     @Override
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java b/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
index 31c93de..e544e0f 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
@@ -20,15 +20,22 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
+import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
+import org.apache.camel.Traceable;
+import org.apache.camel.spi.IdAware;
 import org.apache.camel.support.AsyncProcessorConverterHelper;
 import org.apache.camel.support.AsyncProcessorHelper;
 import org.apache.camel.support.ExchangeHelper;
+import org.apache.camel.support.ReactiveHelper;
+import org.apache.camel.support.ServiceHelper;
+import org.apache.camel.support.ServiceSupport;
 
 import static org.apache.camel.processor.PipelineHelper.continueProcessing;
 
@@ -36,12 +43,15 @@ import static org.apache.camel.processor.PipelineHelper.continueProcessing;
  * Creates a Pipeline pattern where the output of the previous step is sent as
  * input to the next step, reusing the same message exchanges
  */
-public class Pipeline extends MulticastProcessor {
+public class Pipeline extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable, IdAware {
 
+    private final CamelContext camelContext;
+    private List<AsyncProcessor> processors;
     private String id;
 
     public Pipeline(CamelContext camelContext, Collection<Processor> processors) {
-        super(camelContext, processors);
+        this.camelContext = camelContext;
+        this.processors = processors.stream().map(AsyncProcessorConverterHelper::convert).collect(Collectors.toList());
     }
 
     public static Processor newInstance(CamelContext camelContext, List<Processor> processors) {
@@ -77,130 +87,74 @@ public class Pipeline extends MulticastProcessor {
 
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
-        Iterator<Processor> processors = getProcessors().iterator();
-        Exchange nextExchange = exchange;
-        boolean first = true;
-
-        while (continueRouting(processors, nextExchange)) {
-            if (first) {
-                first = false;
-            } else {
-                // prepare for next run
-                nextExchange = createNextExchange(nextExchange);
-            }
-
-            // get the next processor
-            Processor processor = processors.next();
+        ReactiveHelper.scheduleMain(() -> Pipeline.this.doProcess(exchange, callback, processors.iterator(), true),
+                "Step[" + exchange.getExchangeId() + "," + Pipeline.this + "]");
+        return false;
+    }
 
-            AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor);
-            boolean sync = process(exchange, nextExchange, callback, processors, async);
+    protected void doProcess(Exchange exchange, AsyncCallback callback, Iterator<AsyncProcessor> processors, boolean first) {
+        if (continueRouting(processors, exchange)
+                && (first || continueProcessing(exchange, "so breaking out of pipeline", log))) {
 
-            // continue as long its being processed synchronously
-            if (!sync) {
-                log.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId());
-                // the remainder of the pipeline will be completed async
-                // so we break out now, then the callback will be invoked which then continue routing from where we left here
-                return false;
+            // prepare for next run
+            if (exchange.hasOut()) {
+                exchange.setIn(exchange.getOut());
+                exchange.setOut(null);
             }
 
-            log.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId());
+            // get the next processor
+            AsyncProcessor processor = processors.next();
 
-            // check for error if so we should break out
-            if (!continueProcessing(nextExchange, "so breaking out of pipeline", log)) {
-                break;
-            }
+            processor.process(exchange, doneSync ->
+                    ReactiveHelper.schedule(() -> doProcess(exchange, callback, processors, false),
+                            "Step[" + exchange.getExchangeId() + "," + Pipeline.this + "]"));
         }
+        else {
+            ExchangeHelper.copyResults(exchange, exchange);
 
-        // logging nextExchange as it contains the exchange that might have altered the payload and since
-        // we are logging the completion if will be confusing if we log the original instead
-        // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots
-        log.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), nextExchange);
-
-        // copy results back to the original exchange
-        ExchangeHelper.copyResults(exchange, nextExchange);
-
-        callback.done(true);
-        return true;
-    }
-
-    private boolean process(final Exchange original, final Exchange exchange, final AsyncCallback callback,
-                            final Iterator<Processor> processors, final AsyncProcessor asyncProcessor) {
-        // this does the actual processing so log at trace level
-        log.trace("Processing exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
-
-        // implement asynchronous routing logic in callback so we can have the callback being
-        // triggered and then continue routing where we left
-        boolean sync = asyncProcessor.process(exchange, new AsyncCallback() {
-            @Override
-            public void done(final boolean doneSync) {
-                // we only have to handle async completion of the pipeline
-                if (doneSync) {
-                    return;
-                }
-
-                // continue processing the pipeline asynchronously
-                Exchange nextExchange = exchange;
-                while (continueRouting(processors, nextExchange)) {
-                    AsyncProcessor processor = AsyncProcessorConverterHelper.convert(processors.next());
-
-                    // check for error if so we should break out
-                    if (!continueProcessing(nextExchange, "so breaking out of pipeline", log)) {
-                        break;
-                    }
-
-                    nextExchange = createNextExchange(nextExchange);
-                    boolean isDoneSync = process(original, nextExchange, callback, processors, processor);
-                    if (!isDoneSync) {
-                        log.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId());
-                        return;
-                    }
-                }
-
-                ExchangeHelper.copyResults(original, nextExchange);
-                log.trace("Processing complete for exchangeId: {} >>> {}", original.getExchangeId(), original);
-                callback.done(false);
-            }
-        });
-
-        return sync;
-    }
+            // logging nextExchange as it contains the exchange that might have altered the payload and since
+            // we are logging the completion if will be confusing if we log the original instead
+            // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots
+            log.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
 
-    /**
-     * Strategy method to create the next exchange from the previous exchange.
-     * <p/>
-     * Remember to copy the original exchange id otherwise correlation of ids in the log is a problem
-     *
-     * @param previousExchange the previous exchange
-     * @return a new exchange
-     */
-    protected Exchange createNextExchange(Exchange previousExchange) {
-        return PipelineHelper.createNextExchange(previousExchange);
+            ReactiveHelper.callback(callback);
+        }
     }
 
-    protected boolean continueRouting(Iterator<Processor> it, Exchange exchange) {
-        boolean answer = true;
-
+    protected boolean continueRouting(Iterator<AsyncProcessor> it, Exchange exchange) {
         Object stop = exchange.getProperty(Exchange.ROUTE_STOP);
         if (stop != null) {
             boolean doStop = exchange.getContext().getTypeConverter().convertTo(Boolean.class, stop);
             if (doStop) {
                 log.debug("ExchangeId: {} is marked to stop routing: {}", exchange.getExchangeId(), exchange);
-                answer = false;
+                return false;
             }
-        } else {
-            // continue if there are more processors to route
-            answer = it.hasNext();
         }
-
+        // continue if there are more processors to route
+        boolean answer = it.hasNext();
         log.trace("ExchangeId: {} should continue routing: {}", exchange.getExchangeId(), answer);
         return answer;
     }
 
     @Override
+    protected void doStart() throws Exception {
+        ServiceHelper.startService(processors);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        ServiceHelper.stopService(processors);
+    }
+
+    @Override
     public String toString() {
         return "Pipeline[" + getProcessors() + "]";
     }
 
+    public List<Processor> getProcessors() {
+        return (List) processors;
+    }
+
     @Override
     public String getTraceLabel() {
         return "pipeline";
@@ -215,4 +169,15 @@ public class Pipeline extends MulticastProcessor {
     public void setId(String id) {
         this.id = id;
     }
+
+    public List<Processor> next() {
+        if (!hasNext()) {
+            return null;
+        }
+        return new ArrayList<>(processors);
+    }
+
+    public boolean hasNext() {
+        return processors != null && !processors.isEmpty();
+    }
 }
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
index 4df33a3..7851ca0 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
@@ -18,8 +18,6 @@ package org.apache.camel.processor;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -39,16 +37,17 @@ import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.model.OnExceptionDefinition;
 import org.apache.camel.reifier.ErrorHandlerReifier;
 import org.apache.camel.spi.AsyncProcessorAwaitManager;
+import org.apache.camel.spi.CamelLogger;
 import org.apache.camel.spi.ExchangeFormatter;
 import org.apache.camel.spi.ShutdownPrepared;
 import org.apache.camel.spi.SubUnitOfWorkCallback;
 import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.support.AsyncProcessorConverterHelper;
 import org.apache.camel.support.CamelContextHelper;
-import org.apache.camel.spi.CamelLogger;
 import org.apache.camel.support.EventHelper;
 import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.MessageHelper;
+import org.apache.camel.support.ReactiveHelper;
 import org.apache.camel.support.ServiceHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.StopWatch;
@@ -84,171 +83,6 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
     protected final Processor onPrepareProcessor;
     protected final Processor onExceptionProcessor;
 
-    /**
-     * Contains the current redelivery data
-     */
-    protected class RedeliveryData {
-        // redelivery state
-        Exchange original;
-        boolean sync = true;
-        int redeliveryCounter;
-        long redeliveryDelay;
-        Predicate retryWhilePredicate;
-        boolean redeliverFromSync;
-
-        // default behavior which can be overloaded on a per exception basis
-        RedeliveryPolicy currentRedeliveryPolicy;
-        Processor failureProcessor;
-        Processor onRedeliveryProcessor;
-        Processor onExceptionProcessor;
-        Predicate handledPredicate;
-        Predicate continuedPredicate;
-        boolean useOriginalInMessage;
-
-        public RedeliveryData() {
-            // init with values from the error handler
-            this.retryWhilePredicate = retryWhilePolicy;
-            this.currentRedeliveryPolicy = redeliveryPolicy;
-            this.onRedeliveryProcessor = redeliveryProcessor;
-            this.onExceptionProcessor = RedeliveryErrorHandler.this.onExceptionProcessor;
-            this.handledPredicate = getDefaultHandledPredicate();
-            this.useOriginalInMessage = useOriginalMessagePolicy;
-        }
-    }
-
-    /**
-     * Task for sleeping during redelivery attempts.
-     * <p/>
-     * This task is for the synchronous blocking. If using async delayed then a scheduled thread pool
-     * is used for sleeping and trigger redeliveries.
-     */
-    private final class RedeliverSleepTask {
-
-        private final RedeliveryPolicy policy;
-        private final long delay;
-
-        RedeliverSleepTask(RedeliveryPolicy policy, long delay) {
-            this.policy = policy;
-            this.delay = delay;
-        }
-
-        public boolean sleep() throws InterruptedException {
-            // for small delays then just sleep
-            if (delay < 1000) {
-                policy.sleep(delay);
-                return true;
-            }
-
-            StopWatch watch = new StopWatch();
-
-            log.debug("Sleeping for: {} millis until attempting redelivery", delay);
-            while (watch.taken() < delay) {
-                // sleep using 1 sec interval
-
-                long delta = delay - watch.taken();
-                long max = Math.min(1000, delta);
-                if (max > 0) {
-                    log.trace("Sleeping for: {} millis until waking up for re-check", max);
-                    Thread.sleep(max);
-                }
-
-                // are we preparing for shutdown then only do redelivery if allowed
-                if (preparingShutdown && !policy.isAllowRedeliveryWhileStopping()) {
-                    log.debug("Rejected redelivery while stopping");
-                    return false;
-                }
-            }
-
-            return true;
-        }
-    }
-
-    /**
-     * Tasks which performs asynchronous redelivery attempts, and being triggered by a
-     * {@link java.util.concurrent.ScheduledExecutorService} to avoid having any threads blocking if a task
-     * has to be delayed before a redelivery attempt is performed.
-     */
-    private final class AsyncRedeliveryTask implements Callable<Boolean> {
-
-        private final Exchange exchange;
-        private final AsyncCallback callback;
-        private final RedeliveryData data;
-
-        AsyncRedeliveryTask(Exchange exchange, AsyncCallback callback, RedeliveryData data) {
-            this.exchange = exchange;
-            this.callback = callback;
-            this.data = data;
-        }
-
-        public Boolean call() throws Exception {
-            // prepare for redelivery
-            prepareExchangeForRedelivery(exchange, data);
-
-            // letting onRedeliver be executed at first
-            deliverToOnRedeliveryProcessor(exchange, data);
-
-            if (log.isTraceEnabled()) {
-                log.trace("Redelivering exchangeId: {} -> {} for Exchange: {}", exchange.getExchangeId(), outputAsync, exchange);
-            }
-
-            // emmit event we are doing redelivery
-            EventHelper.notifyExchangeRedelivery(exchange.getContext(), exchange, data.redeliveryCounter);
-
-            // process the exchange (also redelivery)
-            boolean sync;
-            if (data.redeliverFromSync) {
-                // this redelivery task was scheduled from synchronous, which we forced to be asynchronous from
-                // this error handler, which means we have to invoke the callback with false, to have the callback
-                // be notified when we are done
-                sync = outputAsync.process(exchange, new AsyncCallback() {
-                    public void done(boolean doneSync) {
-                        log.trace("Redelivering exchangeId: {} done sync: {}", exchange.getExchangeId(), doneSync);
-
-                        // mark we are in sync mode now
-                        data.sync = false;
-
-                        // only process if the exchange hasn't failed
-                        // and it has not been handled by the error processor
-                        if (isDone(exchange)) {
-                            callback.done(false);
-                            return;
-                        }
-
-                        // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
-                        processAsyncErrorHandler(exchange, callback, data);
-                    }
-                });
-            } else {
-                // this redelivery task was scheduled from asynchronous, which means we should only
-                // handle when the asynchronous task was done
-                sync = outputAsync.process(exchange, new AsyncCallback() {
-                    public void done(boolean doneSync) {
-                        log.trace("Redelivering exchangeId: {} done sync: {}", exchange.getExchangeId(), doneSync);
-
-                        // this callback should only handle the async case
-                        if (doneSync) {
-                            return;
-                        }
-
-                        // mark we are in async mode now
-                        data.sync = false;
-
-                        // only process if the exchange hasn't failed
-                        // and it has not been handled by the error processor
-                        if (isDone(exchange)) {
-                            callback.done(doneSync);
-                            return;
-                        }
-                        // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
-                        processAsyncErrorHandler(exchange, callback, data);
-                    }
-                });
-            }
-
-            return sync;
-        }
-    }
-
     public RedeliveryErrorHandler(CamelContext camelContext, Processor output, CamelLogger logger,
                                   Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy, Processor deadLetter,
                                   String deadLetterUri, boolean deadLetterHandleNewException, boolean useOriginalMessagePolicy,
@@ -301,6 +135,25 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
         }
     }
 
+    public void process(Exchange exchange) throws Exception {
+        if (output == null) {
+            // no output then just return
+            return;
+        }
+        awaitManager.process(this, exchange);
+    }
+
+    /**
+     * Process the exchange using redelivery error handling.
+     */
+    public boolean process(final Exchange exchange, final AsyncCallback callback) {
+        // Create the redelivery state object for this exchange
+        RedeliveryState state = new RedeliveryState(exchange, callback);
+        // Run it
+        ReactiveHelper.scheduleMain(state);
+        return false;
+    }
+
     /**
      * Allows to change the output of the error handler which are used when optimising the
      * JMX instrumentation to use either an advice or wrapped processor when calling a processor.
@@ -332,61 +185,10 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
         return answer;
     }
 
-    protected boolean isRunAllowed(RedeliveryData data) {
-        // if camel context is forcing a shutdown then do not allow running
-        boolean forceShutdown = camelContext.getShutdownStrategy().forceShutdown(this);
-        if (forceShutdown) {
-            log.trace("isRunAllowed() -> false (Run not allowed as ShutdownStrategy is forcing shutting down)");
-            return false;
-        }
-
-        // redelivery policy can control if redelivery is allowed during stopping/shutdown
-        // but this only applies during a redelivery (counter must > 0)
-        if (data.redeliveryCounter > 0) {
-            if (data.currentRedeliveryPolicy.allowRedeliveryWhileStopping) {
-                log.trace("isRunAllowed() -> true (Run allowed as RedeliverWhileStopping is enabled)");
-                return true;
-            } else if (preparingShutdown) {
-                // we are preparing for shutdown, now determine if we can still run
-                boolean answer = isRunAllowedOnPreparingShutdown();
-                log.trace("isRunAllowed() -> {} (Run not allowed as we are preparing for shutdown)", answer);
-                return answer;
-            }
-        }
-
-        // we cannot run if we are stopping/stopped
-        boolean answer = !isStoppingOrStopped();
-        log.trace("isRunAllowed() -> {} (Run allowed if we are not stopped/stopping)", answer);
-        return answer;
-    }
-
     protected boolean isRunAllowedOnPreparingShutdown() {
         return false;
     }
 
-    protected boolean isRedeliveryAllowed(RedeliveryData data) {
-        // redelivery policy can control if redelivery is allowed during stopping/shutdown
-        // but this only applies during a redelivery (counter must > 0)
-        if (data.redeliveryCounter > 0) {
-            boolean stopping = isStoppingOrStopped();
-            if (!preparingShutdown && !stopping) {
-                log.trace("isRedeliveryAllowed() -> true (we are not stopping/stopped)");
-                return true;
-            } else {
-                // we are stopping or preparing to shutdown
-                if (data.currentRedeliveryPolicy.allowRedeliveryWhileStopping) {
-                    log.trace("isRedeliveryAllowed() -> true (Redelivery allowed as RedeliverWhileStopping is enabled)");
-                    return true;
-                } else {
-                    log.trace("isRedeliveryAllowed() -> false (Redelivery not allowed as RedeliverWhileStopping is disabled)");
-                    return false;
-                }
-            }
-        }
-
-        return true;
-    }
-
     @Override
     public void prepareShutdown(boolean suspendOnly, boolean forced) {
         // prepare for shutdown, eg do not allow redelivery if configured
@@ -394,202 +196,12 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
         preparingShutdown = true;
     }
 
-    public void process(Exchange exchange) throws Exception {
-        if (output == null) {
-            // no output then just return
-            return;
-        }
-
-        // inline org.apache.camel.support.AsyncProcessorHelper.process(org.apache.camel.AsyncProcessor, org.apache.camel.Exchange)
-        // to optimize and reduce stacktrace lengths
-        final CountDownLatch latch = new CountDownLatch(1);
-        boolean sync = process(exchange, new AsyncCallback() {
-            public void done(boolean doneSync) {
-                if (!doneSync) {
-                    awaitManager.countDown(exchange, latch);
-                }
-            }
-        });
-        if (!sync) {
-            awaitManager.await(exchange, latch);
-        }
-    }
-
-    /**
-     * Process the exchange using redelivery error handling.
-     */
-    public boolean process(final Exchange exchange, final AsyncCallback callback) {
-        final RedeliveryData data = new RedeliveryData();
-
-        // do a defensive copy of the original Exchange, which is needed for redelivery so we can ensure the
-        // original Exchange is being redelivered, and not a mutated Exchange
-        data.original = defensiveCopyExchangeIfNeeded(exchange);
-
-        // use looping to have redelivery attempts
-        while (true) {
-
-            // can we still run
-            if (!isRunAllowed(data)) {
-                log.trace("Run not allowed, will reject executing exchange: {}", exchange);
-                if (exchange.getException() == null) {
-                    exchange.setException(new RejectedExecutionException());
-                }
-                // we cannot process so invoke callback
-                callback.done(data.sync);
-                return data.sync;
-            }
-
-            // did previous processing cause an exception?
-            boolean handle = shouldHandleException(exchange);
-            if (handle) {
-                handleException(exchange, data, isDeadLetterChannel());
-                onExceptionOccurred(exchange, data);
-            }
-
-            // compute if we are exhausted, and whether redelivery is allowed
-            boolean exhausted = isExhausted(exchange, data);
-            boolean redeliverAllowed = isRedeliveryAllowed(data);
-
-            // if we are exhausted or redelivery is not allowed, then deliver to failure processor (eg such as DLC)
-            if (!redeliverAllowed || exhausted) {
-                Processor target = null;
-                boolean deliver = true;
-
-                // the unit of work may have an optional callback associated we need to leverage
-                SubUnitOfWorkCallback uowCallback = exchange.getUnitOfWork().getSubUnitOfWorkCallback();
-                if (uowCallback != null) {
-                    // signal to the callback we are exhausted
-                    uowCallback.onExhausted(exchange);
-                    // do not deliver to the failure processor as its been handled by the callback instead
-                    deliver = false;
-                }
-
-                if (deliver) {
-                    // should deliver to failure processor (either from onException or the dead letter channel)
-                    target = data.failureProcessor != null ? data.failureProcessor : deadLetter;
-                }
-                // we should always invoke the deliverToFailureProcessor as it prepares, logs and does a fair
-                // bit of work for exhausted exchanges (its only the target processor which may be null if handled by a savepoint)
-                boolean isDeadLetterChannel = isDeadLetterChannel() && (target == null || target == deadLetter);
-                boolean sync = deliverToFailureProcessor(target, isDeadLetterChannel, exchange, data, callback);
-                // we are breaking out
-                return sync;
-            }
-
-            if (data.redeliveryCounter > 0) {
-                // calculate delay
-                data.redeliveryDelay = determineRedeliveryDelay(exchange, data.currentRedeliveryPolicy, data.redeliveryDelay, data.redeliveryCounter);
-
-                if (data.redeliveryDelay > 0) {
-                    // okay there is a delay so create a scheduled task to have it executed in the future
-
-                    if (data.currentRedeliveryPolicy.isAsyncDelayedRedelivery() && !exchange.isTransacted()) {
-
-                        // we are doing a redelivery then a thread pool must be configured (see the doStart method)
-                        ObjectHelper.notNull(executorService, "Redelivery is enabled but ExecutorService has not been configured.", this);
-
-                        // let the RedeliverTask be the logic which tries to redeliver the Exchange which we can used a scheduler to
-                        // have it being executed in the future, or immediately
-                        // we are continuing asynchronously
-
-                        // mark we are routing async from now and that this redelivery task came from a synchronous routing
-                        data.sync = false;
-                        data.redeliverFromSync = true;
-                        AsyncRedeliveryTask task = new AsyncRedeliveryTask(exchange, callback, data);
-
-                        // schedule the redelivery task
-                        if (log.isTraceEnabled()) {
-                            log.trace("Scheduling redelivery task to run in {} millis for exchangeId: {}", data.redeliveryDelay, exchange.getExchangeId());
-                        }
-                        executorService.schedule(task, data.redeliveryDelay, TimeUnit.MILLISECONDS);
-
-                        return false;
-                    } else {
-                        // async delayed redelivery was disabled or we are transacted so we must be synchronous
-                        // as the transaction manager requires to execute in the same thread context
-                        try {
-                            // we are doing synchronous redelivery and use thread sleep, so we keep track using a counter how many are sleeping
-                            redeliverySleepCounter.incrementAndGet();
-                            RedeliverSleepTask task = new RedeliverSleepTask(data.currentRedeliveryPolicy, data.redeliveryDelay);
-                            boolean complete = task.sleep();
-                            redeliverySleepCounter.decrementAndGet();
-                            if (!complete) {
-                                // the task was rejected
-                                exchange.setException(new RejectedExecutionException("Redelivery not allowed while stopping"));
-                                // mark the exchange as redelivery exhausted so the failure processor / dead letter channel can process the exchange
-                                exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
-                                // jump to start of loop which then detects that we are failed and exhausted
-                                continue;
-                            }
-                        } catch (InterruptedException e) {
-                            redeliverySleepCounter.decrementAndGet();
-                            // we was interrupted so break out
-                            exchange.setException(e);
-                            // mark the exchange to stop continue routing when interrupted
-                            // as we do not want to continue routing (for example a task has been cancelled)
-                            exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE);
-                            callback.done(data.sync);
-                            return data.sync;
-                        }
-                    }
-                }
-
-                // prepare for redelivery
-                prepareExchangeForRedelivery(exchange, data);
-
-                // letting onRedeliver be executed
-                deliverToOnRedeliveryProcessor(exchange, data);
-
-                // emmit event we are doing redelivery
-                EventHelper.notifyExchangeRedelivery(exchange.getContext(), exchange, data.redeliveryCounter);
-            }
-
-            // process the exchange (also redelivery)
-            boolean sync = outputAsync.process(exchange, new AsyncCallback() {
-                public void done(boolean sync) {
-                    // this callback should only handle the async case
-                    if (sync) {
-                        return;
-                    }
-
-                    // mark we are in async mode now
-                    data.sync = false;
-
-                    // if we are done then notify callback and exit
-                    if (isDone(exchange)) {
-                        callback.done(sync);
-                        return;
-                    }
-
-                    // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
-                    // method which takes care of this in a asynchronous manner
-                    processAsyncErrorHandler(exchange, callback, data);
-                }
-            });
-
-            if (!sync) {
-                // the remainder of the Exchange is being processed asynchronously so we should return
-                return false;
-            }
-            // we continue to route synchronously
-
-            // if we are done then notify callback and exit
-            boolean done = isDone(exchange);
-            if (done) {
-                callback.done(true);
-                return true;
-            }
-
-            // error occurred so loop back around.....
-        }
-    }
-
     /**
      * <p>Determines the redelivery delay time by first inspecting the Message header {@link Exchange#REDELIVERY_DELAY}
      * and if not present, defaulting to {@link RedeliveryPolicy#calculateRedeliveryDelay(long, int)}</p>
      *
-     * <p>In order to prevent manipulation of the RedeliveryData state, the values of {@link RedeliveryData#redeliveryDelay}
-     * and {@link RedeliveryData#redeliveryCounter} are copied in.</p>
+     * <p>In order to prevent manipulation of the RedeliveryData state, the values of {@link RedeliveryState#redeliveryDelay}
+     * and {@link RedeliveryState#redeliveryCounter} are copied in.</p>
      *
      * @param exchange The current exchange in question.
      * @param redeliveryPolicy The RedeliveryPolicy to use in the calculation.
@@ -610,91 +222,10 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
     }
 
     /**
-     * This logic is only executed if we have to retry redelivery asynchronously, which have to be done from the callback.
-     * <p/>
-     * And therefore the logic is a bit different than the synchronous <tt>processErrorHandler</tt> method which can use
-     * a loop based redelivery technique. However this means that these two methods in overall have to be in <b>sync</b>
-     * in terms of logic.
-     */
-    protected void processAsyncErrorHandler(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) {
-        // can we still run
-        if (!isRunAllowed(data)) {
-            log.trace("Run not allowed, will reject executing exchange: {}", exchange);
-            if (exchange.getException() == null) {
-                exchange.setException(new RejectedExecutionException());
-            }
-            callback.done(data.sync);
-            return;
-        }
-
-        // did previous processing cause an exception?
-        boolean handle = shouldHandleException(exchange);
-        if (handle) {
-            handleException(exchange, data, isDeadLetterChannel());
-            onExceptionOccurred(exchange, data);
-        }
-
-        // compute if we are exhausted or not
-        boolean exhausted = isExhausted(exchange, data);
-        if (exhausted) {
-            Processor target = null;
-            boolean deliver = true;
-
-            // the unit of work may have an optional callback associated we need to leverage
-            UnitOfWork uow = exchange.getUnitOfWork();
-            if (uow != null) {
-                SubUnitOfWorkCallback uowCallback = uow.getSubUnitOfWorkCallback();
-                if (uowCallback != null) {
-                    // signal to the callback we are exhausted
-                    uowCallback.onExhausted(exchange);
-                    // do not deliver to the failure processor as its been handled by the callback instead
-                    deliver = false;
-                }
-            }
-
-            if (deliver) {
-                // should deliver to failure processor (either from onException or the dead letter channel)
-                target = data.failureProcessor != null ? data.failureProcessor : deadLetter;
-            }
-            // we should always invoke the deliverToFailureProcessor as it prepares, logs and does a fair
-            // bit of work for exhausted exchanges (its only the target processor which may be null if handled by a savepoint)
-            boolean isDeadLetterChannel = isDeadLetterChannel() && target == deadLetter;
-            deliverToFailureProcessor(target, isDeadLetterChannel, exchange, data, callback);
-            // we are breaking out
-            return;
-        }
-
-        if (data.redeliveryCounter > 0) {
-            // we are doing a redelivery then a thread pool must be configured (see the doStart method)
-            ObjectHelper.notNull(executorService, "Redelivery is enabled but ExecutorService has not been configured.", this);
-
-            // let the RedeliverTask be the logic which tries to redeliver the Exchange which we can used a scheduler to
-            // have it being executed in the future, or immediately
-            // Note: the data.redeliverFromSync should be kept as is, in case it was enabled previously
-            // to ensure the callback will continue routing from where we left
-            AsyncRedeliveryTask task = new AsyncRedeliveryTask(exchange, callback, data);
-
-            // calculate the redelivery delay
-            data.redeliveryDelay = determineRedeliveryDelay(exchange, data.currentRedeliveryPolicy, data.redeliveryDelay, data.redeliveryCounter);
-
-            if (data.redeliveryDelay > 0) {
-                // schedule the redelivery task
-                if (log.isTraceEnabled()) {
-                    log.trace("Scheduling redelivery task to run in {} millis for exchangeId: {}", data.redeliveryDelay, exchange.getExchangeId());
-                }
-                executorService.schedule(task, data.redeliveryDelay, TimeUnit.MILLISECONDS);
-            } else {
-                // execute the task immediately
-                executorService.submit(task);
-            }
-        }
-    }
-
-    /**
-     * Performs a defensive copy of the exchange if needed
-     *
-     * @param exchange the exchange
-     * @return the defensive copy, or <tt>null</tt> if not needed (redelivery is not enabled).
+     * Performs a defensive copy of the exchange if needed
+     *
+     * @param exchange the exchange
+     * @return the defensive copy, or <tt>null</tt> if not needed (redelivery is not enabled).
      */
     protected Exchange defensiveCopyExchangeIfNeeded(Exchange exchange) {
         // only do a defensive copy if redelivery is enabled
@@ -790,292 +321,486 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
         return null;
     }
 
-    protected void prepareExchangeForContinue(Exchange exchange, RedeliveryData data, boolean isDeadLetterChannel) {
-        Exception caught = exchange.getException();
-
-        // we continue so clear any exceptions
-        exchange.setException(null);
-        // clear rollback flags
-        exchange.setProperty(Exchange.ROLLBACK_ONLY, null);
-        // reset cached streams so they can be read again
-        MessageHelper.resetStreamCache(exchange.getIn());
-
-        // its continued then remove traces of redelivery attempted and caught exception
-        exchange.getIn().removeHeader(Exchange.REDELIVERED);
-        exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER);
-        exchange.getIn().removeHeader(Exchange.REDELIVERY_MAX_COUNTER);
-        exchange.removeProperty(Exchange.FAILURE_HANDLED);
-        // keep the Exchange.EXCEPTION_CAUGHT as property so end user knows the caused exception
-
-        // create log message
-        String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange);
-        msg = msg + ". Exhausted after delivery attempt: " + data.redeliveryCounter + " caught: " + caught;
-        msg = msg + ". Handled and continue routing.";
-
-        // log that we failed but want to continue
-        logFailedDelivery(false, false, false, true, isDeadLetterChannel, exchange, msg, data, null);
-    }
+    /**
+     * Contains the current redelivery state
+     */
+    protected class RedeliveryState implements Runnable {
+        Exchange original;
+        Exchange exchange;
+        AsyncCallback callback;
+        boolean sync = true;
+        int redeliveryCounter;
+        long redeliveryDelay;
+        Predicate retryWhilePredicate;
+        boolean redeliverFromSync;
+
+        // default behavior which can be overloaded on a per exception basis
+        RedeliveryPolicy currentRedeliveryPolicy;
+        Processor failureProcessor;
+        Processor onRedeliveryProcessor;
+        Processor onExceptionProcessor;
+        Predicate handledPredicate;
+        Predicate continuedPredicate;
+        boolean useOriginalInMessage;
+
+        public RedeliveryState(Exchange exchange, AsyncCallback callback) {
+            // init with values from the error handler
+            this.retryWhilePredicate = retryWhilePolicy;
+            this.currentRedeliveryPolicy = redeliveryPolicy;
+            this.handledPredicate = getDefaultHandledPredicate();
+            this.useOriginalInMessage = useOriginalMessagePolicy;
+            this.onRedeliveryProcessor = redeliveryProcessor;
+            this.onExceptionProcessor = RedeliveryErrorHandler.this.onExceptionProcessor;
 
-    protected void prepareExchangeForRedelivery(Exchange exchange, RedeliveryData data) {
-        if (!redeliveryEnabled) {
-            throw new IllegalStateException("Redelivery is not enabled on " + this + ". Make sure you have configured the error handler properly.");
+            // do a defensive copy of the original Exchange, which is needed for redelivery so we can ensure the
+            // original Exchange is being redelivered, and not a mutated Exchange
+            this.original = defensiveCopyExchangeIfNeeded(exchange);
+            this.exchange = exchange;
+            this.callback = callback;
         }
-        // there must be a defensive copy of the exchange
-        ObjectHelper.notNull(data.original, "Defensive copy of Exchange is null", this);
 
-        // okay we will give it another go so clear the exception so we can try again
-        exchange.setException(null);
+        public String toString() {
+            return "Step[" + exchange.getExchangeId() + "," + RedeliveryErrorHandler.this + "]";
+        }
 
-        // clear rollback flags
-        exchange.setProperty(Exchange.ROLLBACK_ONLY, null);
+        /**
+         * Redelivery logic.
+         */
+        public void run() {
+            // can we still run
+            if (!isRunAllowed()) {
+                log.trace("Run not allowed, will reject executing exchange: {}", exchange);
+                if (exchange.getException() == null) {
+                    exchange.setException(new RejectedExecutionException());
+                }
+                callback.done(false);
+                return;
+            }
 
-        // TODO: We may want to store these as state on RedeliveryData so we keep them in case end user messes with Exchange
-        // and then put these on the exchange when doing a redelivery / fault processor
+            // did previous processing cause an exception?
+            boolean handle = shouldHandleException(exchange);
+            if (handle) {
+                handleException();
+                onExceptionOccurred();
+            }
 
-        // preserve these headers
-        Integer redeliveryCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
-        Integer redeliveryMaxCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_MAX_COUNTER, Integer.class);
-        Boolean redelivered = exchange.getIn().getHeader(Exchange.REDELIVERED, Boolean.class);
+            // compute if we are exhausted or not
+            boolean exhausted = isExhausted(exchange);
+            boolean redeliverAllowed = isRedeliveryAllowed();
 
-        // we are redelivering so copy from original back to exchange
-        exchange.getIn().copyFrom(data.original.getIn());
-        exchange.setOut(null);
-        // reset cached streams so they can be read again
-        MessageHelper.resetStreamCache(exchange.getIn());
+            // if we are exhausted or redelivery is not allowed, then deliver to failure processor (eg such as DLC)
+            if (!redeliverAllowed || exhausted) {
+                Processor target = null;
+                boolean deliver = true;
 
-        // put back headers
-        if (redeliveryCounter != null) {
-            exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, redeliveryCounter);
-        }
-        if (redeliveryMaxCounter != null) {
-            exchange.getIn().setHeader(Exchange.REDELIVERY_MAX_COUNTER, redeliveryMaxCounter);
-        }
-        if (redelivered != null) {
-            exchange.getIn().setHeader(Exchange.REDELIVERED, redelivered);
-        }
-    }
+                // the unit of work may have an optional callback associated we need to leverage
+                UnitOfWork uow = exchange.getUnitOfWork();
+                if (uow != null) {
+                    SubUnitOfWorkCallback uowCallback = uow.getSubUnitOfWorkCallback();
+                    if (uowCallback != null) {
+                        // signal to the callback we are exhausted
+                        uowCallback.onExhausted(exchange);
+                        // do not deliver to the failure processor as its been handled by the callback instead
+                        deliver = false;
+                    }
+                }
 
-    protected void handleException(Exchange exchange, RedeliveryData data, boolean isDeadLetterChannel) {
-        Exception e = exchange.getException();
-        // e is never null
-
-        Throwable previous = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class);
-        if (previous != null && previous != e) {
-            // a 2nd exception was thrown while handling a previous exception
-            // so we need to add the previous as suppressed by the new exception
-            // see also FatalFallbackErrorHandler
-            Throwable[] suppressed = e.getSuppressed();
-            boolean found = false;
-            for (Throwable t : suppressed) {
-                if (t == previous) {
-                    found = true;
+                if (deliver) {
+                    // should deliver to failure processor (either from onException or the dead letter channel)
+                    target = failureProcessor != null ? failureProcessor : deadLetter;
                 }
+                // we should always invoke the deliverToFailureProcessor as it prepares, logs and does a fair
+                // bit of work for exhausted exchanges (its only the target processor which may be null if handled by a savepoint)
+                boolean isDeadLetterChannel = isDeadLetterChannel() && target == deadLetter;
+                deliverToFailureProcessor(target, isDeadLetterChannel, exchange);
+                // we are breaking out
             }
-            if (!found) {
-                e.addSuppressed(previous);
-            }
-        }
+            else if (redeliveryCounter > 0) {
+                // calculate the redelivery delay
+                redeliveryDelay = determineRedeliveryDelay(exchange, currentRedeliveryPolicy, redeliveryDelay, redeliveryCounter);
 
-        // store the original caused exception in a property, so we can restore it later
-        exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e);
+                if (redeliveryDelay > 0) {
+                    // okay there is a delay so create a scheduled task to have it executed in the future
 
-        // find the error handler to use (if any)
-        OnExceptionDefinition exceptionPolicy = getExceptionPolicy(exchange, e);
-        if (exceptionPolicy != null) {
-            data.currentRedeliveryPolicy = ErrorHandlerReifier.createRedeliveryPolicy(exceptionPolicy, exchange.getContext(), data.currentRedeliveryPolicy);
-            data.handledPredicate = exceptionPolicy.getHandledPolicy();
-            data.continuedPredicate = exceptionPolicy.getContinuedPolicy();
-            data.retryWhilePredicate = exceptionPolicy.getRetryWhilePolicy();
-            data.useOriginalInMessage = exceptionPolicy.getUseOriginalMessagePolicy() != null && exceptionPolicy.getUseOriginalMessagePolicy();
+                    if (currentRedeliveryPolicy.isAsyncDelayedRedelivery() && !exchange.isTransacted()) {
 
-            // route specific failure handler?
-            Processor processor = null;
-            UnitOfWork uow = exchange.getUnitOfWork();
-            if (uow != null && uow.getRouteContext() != null) {
-                String routeId = uow.getRouteContext().getRoute().getId();
-                processor = exceptionPolicy.getErrorHandler(routeId);
-            } else if (!exceptionPolicy.getErrorHandlers().isEmpty()) {
-                // note this should really not happen, but we have this code as a fail safe
-                // to be backwards compatible with the old behavior
-                log.warn("Cannot determine current route from Exchange with id: {}, will fallback and use first error handler.", exchange.getExchangeId());
-                processor = exceptionPolicy.getErrorHandlers().iterator().next();
-            }
-            if (processor != null) {
-                data.failureProcessor = processor;
-            }
+                        // we are doing a redelivery then a thread pool must be configured (see the doStart method)
+                        ObjectHelper.notNull(executorService, "Redelivery is enabled but ExecutorService has not been configured.", this);
 
-            // route specific on redelivery?
-            processor = exceptionPolicy.getOnRedelivery();
-            if (processor != null) {
-                data.onRedeliveryProcessor = processor;
+                        // schedule the redelivery task
+                        if (log.isTraceEnabled()) {
+                            log.trace("Scheduling redelivery task to run in {} millis for exchangeId: {}", redeliveryDelay, exchange.getExchangeId());
+                        }
+                        executorService.schedule(() -> ReactiveHelper.schedule(this::redeliver), redeliveryDelay, TimeUnit.MILLISECONDS);
+
+                    } else {
+                        // async delayed redelivery was disabled or we are transacted so we must be synchronous
+                        // as the transaction manager requires to execute in the same thread context
+                        try {
+                            // we are doing synchronous redelivery and use thread sleep, so we keep track using a counter how many are sleeping
+                            redeliverySleepCounter.incrementAndGet();
+                            boolean complete = sleep();
+                            redeliverySleepCounter.decrementAndGet();
+                            if (!complete) {
+                                // the task was rejected
+                                exchange.setException(new RejectedExecutionException("Redelivery not allowed while stopping"));
+                                // mark the exchange as redelivery exhausted so the failure processor / dead letter channel can process the exchange
+                                exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
+                                // jump to start of loop which then detects that we are failed and exhausted
+                                ReactiveHelper.schedule(this);
+                            } else {
+                                ReactiveHelper.schedule(this::redeliver);
+                            }
+                        } catch (InterruptedException e) {
+                            redeliverySleepCounter.decrementAndGet();
+                            // we was interrupted so break out
+                            exchange.setException(e);
+                            // mark the exchange to stop continue routing when interrupted
+                            // as we do not want to continue routing (for example a task has been cancelled)
+                            exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE);
+                            ReactiveHelper.callback(callback);
+                        }
+                    }
+                } else {
+                    // execute the task immediately
+                    ReactiveHelper.schedule(this::redeliver);
+                }
             }
-            // route specific on exception occurred?
-            processor = exceptionPolicy.getOnExceptionOccurred();
-            if (processor != null) {
-                data.onExceptionProcessor = processor;
+            else {
+                // Simple delivery
+                outputAsync.process(exchange, doneSync -> {
+                    // only process if the exchange hasn't failed
+                    // and it has not been handled by the error processor
+                    if (isDone(exchange)) {
+                        ReactiveHelper.callback(callback);
+                    } else {
+                        // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
+                        ReactiveHelper.schedule(this);
+                    }
+                });
             }
         }
 
-        // only log if not failure handled or not an exhausted unit of work
-        if (!ExchangeHelper.isFailureHandled(exchange) && !ExchangeHelper.isUnitOfWorkExhausted(exchange)) {
-            String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange)
-                    + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e;
-            logFailedDelivery(true, false, false, false, isDeadLetterChannel, exchange, msg, data, e);
-        }
+        protected boolean isRunAllowed() {
+            // if camel context is forcing a shutdown then do not allow running
+            boolean forceShutdown = camelContext.getShutdownStrategy().forceShutdown(RedeliveryErrorHandler.this);
+            if (forceShutdown) {
+                log.trace("isRunAllowed() -> false (Run not allowed as ShutdownStrategy is forcing shutting down)");
+                return false;
+            }
 
-        data.redeliveryCounter = incrementRedeliveryCounter(exchange, e, data);
-    }
+            // redelivery policy can control if redelivery is allowed during stopping/shutdown
+            // but this only applies during a redelivery (counter must > 0)
+            if (redeliveryCounter > 0) {
+                if (currentRedeliveryPolicy.allowRedeliveryWhileStopping) {
+                    log.trace("isRunAllowed() -> true (Run allowed as RedeliverWhileStopping is enabled)");
+                    return true;
+                } else if (preparingShutdown) {
+                    // we are preparing for shutdown, now determine if we can still run
+                    boolean answer = isRunAllowedOnPreparingShutdown();
+                    log.trace("isRunAllowed() -> {} (Run not allowed as we are preparing for shutdown)", answer);
+                    return answer;
+                }
+            }
 
-    /**
-     * Gives an optional configured OnExceptionOccurred processor a chance to process just after an exception
-     * was thrown while processing the Exchange. This allows to execute the processor at the same time the exception was thrown.
-     */
-    protected void onExceptionOccurred(Exchange exchange, final RedeliveryData data) {
-        if (data.onExceptionProcessor == null) {
-            return;
+            // we cannot run if we are stopping/stopped
+            boolean answer = !isStoppingOrStopped();
+            log.trace("isRunAllowed() -> {} (Run allowed if we are not stopped/stopping)", answer);
+            return answer;
         }
 
-        // run this synchronously as its just a Processor
-        try {
-            if (log.isTraceEnabled()) {
-                log.trace("OnExceptionOccurred processor {} is processing Exchange: {} due exception occurred", data.onExceptionProcessor, exchange);
+        protected boolean isRedeliveryAllowed() {
+            // redelivery policy can control if redelivery is allowed during stopping/shutdown
+            // but this only applies during a redelivery (counter must > 0)
+            if (redeliveryCounter > 0) {
+                boolean stopping = isStoppingOrStopped();
+                if (!preparingShutdown && !stopping) {
+                    log.trace("isRedeliveryAllowed() -> true (we are not stopping/stopped)");
+                    return true;
+                } else {
+                    // we are stopping or preparing to shutdown
+                    if (currentRedeliveryPolicy.allowRedeliveryWhileStopping) {
+                        log.trace("isRedeliveryAllowed() -> true (Redelivery allowed as RedeliverWhileStopping is enabled)");
+                        return true;
+                    } else {
+                        log.trace("isRedeliveryAllowed() -> false (Redelivery not allowed as RedeliverWhileStopping is disabled)");
+                        return false;
+                    }
+                }
             }
-            data.onExceptionProcessor.process(exchange);
-        } catch (Throwable e) {
-            // we dont not want new exception to override existing, so log it as a WARN
-            log.warn("Error during processing OnExceptionOccurred. This exception is ignored.", e);
-        }
-        log.trace("OnExceptionOccurred processor done");
-    }
 
-    /**
-     * Gives an optional configured redelivery processor a chance to process before the Exchange
-     * will be redelivered. This can be used to alter the Exchange.
-     */
-    protected void deliverToOnRedeliveryProcessor(final Exchange exchange, final RedeliveryData data) {
-        if (data.onRedeliveryProcessor == null) {
-            return;
+            return true;
         }
 
-        if (log.isTraceEnabled()) {
-            log.trace("Redelivery processor {} is processing Exchange: {} before its redelivered",
-                    data.onRedeliveryProcessor, exchange);
-        }
+        protected void redeliver() {
+            // prepare for redelivery
+            prepareExchangeForRedelivery();
 
-        // run this synchronously as its just a Processor
-        try {
-            data.onRedeliveryProcessor.process(exchange);
-        } catch (Throwable e) {
-            exchange.setException(e);
-        }
-        log.trace("Redelivery processor done");
-    }
+            // letting onRedeliver be executed at first
+            deliverToOnRedeliveryProcessor();
 
-    /**
-     * All redelivery attempts failed so move the exchange to the dead letter queue
-     */
-    protected boolean deliverToFailureProcessor(final Processor processor, final boolean isDeadLetterChannel, final Exchange exchange,
-                                                final RedeliveryData data, final AsyncCallback callback) {
-        boolean sync = true;
+            if (log.isTraceEnabled()) {
+                log.trace("Redelivering exchangeId: {} -> {} for Exchange: {}", exchange.getExchangeId(), outputAsync, exchange);
+            }
+
+            // emmit event we are doing redelivery
+            EventHelper.notifyExchangeRedelivery(exchange.getContext(), exchange, redeliveryCounter);
 
-        Exception caught = exchange.getException();
+            // process the exchange (also redelivery)
+            outputAsync.process(exchange, doneSync -> {
+                log.trace("Redelivering exchangeId: {}", exchange.getExchangeId());
 
-        // we did not success with the redelivery so now we let the failure processor handle it
-        // clear exception as we let the failure processor handle it
-        exchange.setException(null);
+                // only process if the exchange hasn't failed
+                // and it has not been handled by the error processor
+                if (isDone(exchange)) {
+                    ReactiveHelper.callback(callback);
+                    return;
+                } else {
+                    // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
+                    ReactiveHelper.schedule(this);
+                }
+            });
+        }
 
-        final boolean shouldHandle = shouldHandle(exchange, data);
-        final boolean shouldContinue = shouldContinue(exchange, data);
+        protected void prepareExchangeForContinue(Exchange exchange, boolean isDeadLetterChannel) {
+            Exception caught = exchange.getException();
 
-        // regard both handled or continued as being handled
-        boolean handled = false;
+            // we continue so clear any exceptions
+            exchange.setException(null);
+            // clear rollback flags
+            exchange.setProperty(Exchange.ROLLBACK_ONLY, null);
+            // reset cached streams so they can be read again
+            MessageHelper.resetStreamCache(exchange.getIn());
 
-        // always handle if dead letter channel
-        boolean handleOrContinue = isDeadLetterChannel || shouldHandle || shouldContinue;
-        if (handleOrContinue) {
-            // its handled then remove traces of redelivery attempted
+            // its continued then remove traces of redelivery attempted and caught exception
             exchange.getIn().removeHeader(Exchange.REDELIVERED);
             exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER);
             exchange.getIn().removeHeader(Exchange.REDELIVERY_MAX_COUNTER);
-            exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
+            exchange.removeProperty(Exchange.FAILURE_HANDLED);
+            // keep the Exchange.EXCEPTION_CAUGHT as property so end user knows the caused exception
 
-            // and remove traces of rollback only and uow exhausted markers
-            exchange.removeProperty(Exchange.ROLLBACK_ONLY);
-            exchange.removeProperty(Exchange.UNIT_OF_WORK_EXHAUSTED);
+            // create log message
+            String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange);
+            msg = msg + ". Exhausted after delivery attempt: " + redeliveryCounter + " caught: " + caught;
+            msg = msg + ". Handled and continue routing.";
 
-            handled = true;
-        } else {
-            // must decrement the redelivery counter as we didn't process the redelivery but is
-            // handling by the failure handler. So we must -1 to not let the counter be out-of-sync
-            decrementRedeliveryCounter(exchange);
+            // log that we failed but want to continue
+            logFailedDelivery(false, false, false, true, isDeadLetterChannel, exchange, msg, null);
         }
 
-        // we should allow using the failure processor if we should not continue
-        // or in case of continue then the failure processor is NOT a dead letter channel
-        // because you can continue and still let the failure processor do some routing
-        // before continue in the main route.
-        boolean allowFailureProcessor = !shouldContinue || !isDeadLetterChannel;
-
-        if (allowFailureProcessor && processor != null) {
-
-            // prepare original IN body if it should be moved instead of current body
-            if (data.useOriginalInMessage) {
-                log.trace("Using the original IN message instead of current");
-                Message original = ExchangeHelper.getOriginalInMessage(exchange);
-                exchange.setIn(original);
-                if (exchange.hasOut()) {
-                    log.trace("Removing the out message to avoid some uncertain behavior");
-                    exchange.setOut(null);
-                }
+        protected void prepareExchangeForRedelivery() {
+            if (!redeliveryEnabled) {
+                throw new IllegalStateException("Redelivery is not enabled on " + RedeliveryErrorHandler.this + ". Make sure you have configured the error handler properly.");
             }
+            // there must be a defensive copy of the exchange
+            ObjectHelper.notNull(this.original, "Defensive copy of Exchange is null", RedeliveryErrorHandler.this);
+
+            // okay we will give it another go so clear the exception so we can try again
+            exchange.setException(null);
+
+            // clear rollback flags
+            exchange.setProperty(Exchange.ROLLBACK_ONLY, null);
 
+            // TODO: We may want to store these as state on RedeliveryData so we keep them in case end user messes with Exchange
+            // and then put these on the exchange when doing a redelivery / fault processor
+
+            // preserve these headers
+            Integer redeliveryCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
+            Integer redeliveryMaxCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_MAX_COUNTER, Integer.class);
+            Boolean redelivered = exchange.getIn().getHeader(Exchange.REDELIVERED, Boolean.class);
+
+            // we are redelivering so copy from original back to exchange
+            exchange.getIn().copyFrom(this.original.getIn());
+            exchange.setOut(null);
             // reset cached streams so they can be read again
             MessageHelper.resetStreamCache(exchange.getIn());
 
-            // invoke custom on prepare
-            if (onPrepareProcessor != null) {
-                try {
-                    log.trace("OnPrepare processor {} is processing Exchange: {}", onPrepareProcessor, exchange);
-                    onPrepareProcessor.process(exchange);
-                } catch (Exception e) {
-                    // a new exception was thrown during prepare
-                    exchange.setException(e);
+            // put back headers
+            if (redeliveryCounter != null) {
+                exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, redeliveryCounter);
+            }
+            if (redeliveryMaxCounter != null) {
+                exchange.getIn().setHeader(Exchange.REDELIVERY_MAX_COUNTER, redeliveryMaxCounter);
+            }
+            if (redelivered != null) {
+                exchange.getIn().setHeader(Exchange.REDELIVERED, redelivered);
+            }
+        }
+
+        protected void handleException() {
+            Exception e = exchange.getException();
+            // e is never null
+
+            Throwable previous = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class);
+            if (previous != null && previous != e) {
+                // a 2nd exception was thrown while handling a previous exception
+                // so we need to add the previous as suppressed by the new exception
+                // see also FatalFallbackErrorHandler
+                Throwable[] suppressed = e.getSuppressed();
+                boolean found = false;
+                for (Throwable t : suppressed) {
+                    if (t == previous) {
+                        found = true;
+                    }
+                }
+                if (!found) {
+                    e.addSuppressed(previous);
                 }
             }
 
-            log.trace("Failure processor {} is processing Exchange: {}", processor, exchange);
+            // store the original caused exception in a property, so we can restore it later
+            exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e);
+
+            // find the error handler to use (if any)
+            OnExceptionDefinition exceptionPolicy = getExceptionPolicy(exchange, e);
+            if (exceptionPolicy != null) {
+                currentRedeliveryPolicy = ErrorHandlerReifier.createRedeliveryPolicy(exceptionPolicy, exchange.getContext(), currentRedeliveryPolicy);
+                handledPredicate = exceptionPolicy.getHandledPolicy();
+                continuedPredicate = exceptionPolicy.getContinuedPolicy();
+                retryWhilePredicate = exceptionPolicy.getRetryWhilePolicy();
+                useOriginalInMessage = exceptionPolicy.getUseOriginalMessagePolicy() != null && exceptionPolicy.getUseOriginalMessagePolicy();
+
+                // route specific failure handler?
+                Processor processor = null;
+                UnitOfWork uow = exchange.getUnitOfWork();
+                if (uow != null && uow.getRouteContext() != null) {
+                    String routeId = uow.getRouteContext().getRoute().getId();
+                    processor = exceptionPolicy.getErrorHandler(routeId);
+                } else if (!exceptionPolicy.getErrorHandlers().isEmpty()) {
+                    // note this should really not happen, but we have this code as a fail safe
+                    // to be backwards compatible with the old behavior
+                    log.warn("Cannot determine current route from Exchange with id: {}, will fallback and use first error handler.", exchange.getExchangeId());
+                    processor = exceptionPolicy.getErrorHandlers().iterator().next();
+                }
+                if (processor != null) {
+                    failureProcessor = processor;
+                }
 
-            // store the last to endpoint as the failure endpoint
-            exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
-            // and store the route id so we know in which route we failed
-            UnitOfWork uow = exchange.getUnitOfWork();
-            if (uow != null && uow.getRouteContext() != null) {
-                exchange.setProperty(Exchange.FAILURE_ROUTE_ID, uow.getRouteContext().getRoute().getId());
+                // route specific on redelivery?
+                processor = exceptionPolicy.getOnRedelivery();
+                if (processor != null) {
+                    onRedeliveryProcessor = processor;
+                }
+                // route specific on exception occurred?
+                processor = exceptionPolicy.getOnExceptionOccurred();
+                if (processor != null) {
+                    onExceptionProcessor = processor;
+                }
             }
 
-            // fire event as we had a failure processor to handle it, which there is a event for
-            final boolean deadLetterChannel = processor == deadLetter;
+            // only log if not failure handled or not an exhausted unit of work
+            if (!ExchangeHelper.isFailureHandled(exchange) && !ExchangeHelper.isUnitOfWorkExhausted(exchange)) {
+                String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange)
+                        + ". On delivery attempt: " + redeliveryCounter + " caught: " + e;
+                logFailedDelivery(true, false, false, false, isDeadLetterChannel(), exchange, msg, e);
+            }
 
-            EventHelper.notifyExchangeFailureHandling(exchange.getContext(), exchange, processor, deadLetterChannel, deadLetterUri);
+            redeliveryCounter = incrementRedeliveryCounter(exchange, e);
+        }
 
-            // the failure processor could also be asynchronous
-            AsyncProcessor afp = AsyncProcessorConverterHelper.convert(processor);
-            sync = afp.process(exchange, new AsyncCallback() {
-                public void done(boolean sync) {
-                    log.trace("Failure processor done: {} processing Exchange: {}", processor, exchange);
-                    try {
-                        prepareExchangeAfterFailure(exchange, data, isDeadLetterChannel, shouldHandle, shouldContinue);
-                        // fire event as we had a failure processor to handle it, which there is a event for
-                        EventHelper.notifyExchangeFailureHandled(exchange.getContext(), exchange, processor, deadLetterChannel, deadLetterUri);
-                    } finally {
-                        // if the fault was handled asynchronously, this should be reflected in the callback as well
-                        data.sync &= sync;
-                        callback.done(data.sync);
-                    }
+        /**
+         * Gives an optional configured OnExceptionOccurred processor a chance to process just after an exception
+         * was thrown while processing the Exchange. This allows to execute the processor at the same time the exception was thrown.
+         */
+        protected void onExceptionOccurred() {
+            if (onExceptionProcessor == null) {
+                return;
+            }
+
+            // run this synchronously as its just a Processor
+            try {
+                if (log.isTraceEnabled()) {
+                    log.trace("OnExceptionOccurred processor {} is processing Exchange: {} due exception occurred", onExceptionProcessor, exchange);
                 }
-            });
-        } else {
+                onExceptionProcessor.process(exchange);
+            } catch (Throwable e) {
+                // we dont not want new exception to override existing, so log it as a WARN
+                log.warn("Error during processing OnExceptionOccurred. This exception is ignored.", e);
+            }
+            log.trace("OnExceptionOccurred processor done");
+        }
+
+        /**
+         * Gives an optional configured redelivery processor a chance to process before the Exchange
+         * will be redelivered. This can be used to alter the Exchange.
+         */
+        protected void deliverToOnRedeliveryProcessor() {
+            if (onRedeliveryProcessor == null) {
+                return;
+            }
+
+            if (log.isTraceEnabled()) {
+                log.trace("Redelivery processor {} is processing Exchange: {} before its redelivered",
+                        onRedeliveryProcessor, exchange);
+            }
+
+            // run this synchronously as its just a Processor
             try {
+                onRedeliveryProcessor.process(exchange);
+            } catch (Throwable e) {
+                exchange.setException(e);
+            }
+            log.trace("Redelivery processor done");
+        }
+
+        /**
+         * All redelivery attempts failed so move the exchange to the dead letter queue
+         */
+        protected void deliverToFailureProcessor(final Processor processor, final boolean isDeadLetterChannel, final Exchange exchange) {
+            Exception caught = exchange.getException();
+
+            // we did not success with the redelivery so now we let the failure processor handle it
+            // clear exception as we let the failure processor handle it
+            exchange.setException(null);
+
+            final boolean shouldHandle = shouldHandle(exchange);
+            final boolean shouldContinue = shouldContinue(exchange);
+
+            // regard both handled or continued as being handled
+            boolean handled = false;
+
+            // always handle if dead letter channel
+            boolean handleOrContinue = isDeadLetterChannel || shouldHandle || shouldContinue;
+            if (handleOrContinue) {
+                // its handled then remove traces of redelivery attempted
+                exchange.getIn().removeHeader(Exchange.REDELIVERED);
+                exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER);
+                exchange.getIn().removeHeader(Exchange.REDELIVERY_MAX_COUNTER);
+                exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
+
+                // and remove traces of rollback only and uow exhausted markers
+                exchange.removeProperty(Exchange.ROLLBACK_ONLY);
+                exchange.removeProperty(Exchange.UNIT_OF_WORK_EXHAUSTED);
+
+                handled = true;
+            } else {
+                // must decrement the redelivery counter as we didn't process the redelivery but is
+                // handling by the failure handler. So we must -1 to not let the counter be out-of-sync
+                decrementRedeliveryCounter(exchange);
+            }
+
+            // we should allow using the failure processor if we should not continue
+            // or in case of continue then the failure processor is NOT a dead letter channel
+            // because you can continue and still let the failure processor do some routing
+            // before continue in the main route.
+            boolean allowFailureProcessor = !shouldContinue || !isDeadLetterChannel;
+
+            if (allowFailureProcessor && processor != null) {
+
+                // prepare original IN body if it should be moved instead of current body
+                if (useOriginalInMessage) {
+                    log.trace("Using the original IN message instead of current");
+                    Message original = ExchangeHelper.getOriginalInMessage(exchange);
+                    exchange.setIn(original);
+                    if (exchange.hasOut()) {
+                        log.trace("Removing the out message to avoid some uncertain behavior");
+                        exchange.setOut(null);
+                    }
+                }
+
+                // reset cached streams so they can be read again
+                MessageHelper.resetStreamCache(exchange.getIn());
+
                 // invoke custom on prepare
                 if (onPrepareProcessor != null) {
                     try {
@@ -1086,320 +811,390 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
                         exchange.setException(e);
                     }
                 }
-                // no processor but we need to prepare after failure as well
-                prepareExchangeAfterFailure(exchange, data, isDeadLetterChannel, shouldHandle, shouldContinue);
-            } finally {
-                // callback we are done
-                callback.done(data.sync);
-            }
-        }
 
-        // create log message
-        String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange);
-        msg = msg + ". Exhausted after delivery attempt: " + data.redeliveryCounter + " caught: " + caught;
-        if (processor != null) {
-            if (isDeadLetterChannel && deadLetterUri != null) {
-                msg = msg + ". Handled by DeadLetterChannel: [" + URISupport.sanitizeUri(deadLetterUri) + "]";
+                log.trace("Failure processor {} is processing Exchange: {}", processor, exchange);
+
+                // store the last to endpoint as the failure endpoint
+                exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
+                // and store the route id so we know in which route we failed
+                UnitOfWork uow = exchange.getUnitOfWork();
+                if (uow != null && uow.getRouteContext() != null) {
+                    exchange.setProperty(Exchange.FAILURE_ROUTE_ID, uow.getRouteContext().getRoute().getId());
+                }
+
+                // fire event as we had a failure processor to handle it, which there is a event for
+                final boolean deadLetterChannel = processor == deadLetter;
+
+                EventHelper.notifyExchangeFailureHandling(exchange.getContext(), exchange, processor, deadLetterChannel, deadLetterUri);
+
+                // the failure processor could also be asynchronous
+                AsyncProcessor afp = AsyncProcessorConverterHelper.convert(processor);
+                afp.process(exchange, sync -> {
+                    log.trace("Failure processor done: {} processing Exchange: {}", processor, exchange);
+                    try {
+                        prepareExchangeAfterFailure(exchange, isDeadLetterChannel, shouldHandle, shouldContinue);
+                        // fire event as we had a failure processor to handle it, which there is a event for
+                        EventHelper.notifyExchangeFailureHandled(exchange.getContext(), exchange, processor, deadLetterChannel, deadLetterUri);
+                    } finally {
+                        // if the fault was handled asynchronously, this should be reflected in the callback as well
+                        ReactiveHelper.callback(callback);
+                    }
+                });
             } else {
-                msg = msg + ". Processed by failure processor: " + processor;
+                try {
+                    // invoke custom on prepare
+                    if (onPrepareProcessor != null) {
+                        try {
+                            log.trace("OnPrepare processor {} is processing Exchange: {}", onPrepareProcessor, exchange);
+                            onPrepareProcessor.process(exchange);
+                        } catch (Exception e) {
+                            // a new exception was thrown during prepare
+                            exchange.setException(e);
+                        }
+                    }
+                    // no processor but we need to prepare after failure as well
+                    prepareExchangeAfterFailure(exchange, isDeadLetterChannel, shouldHandle, shouldContinue);
+                } finally {
+                    // callback we are done
+                    ReactiveHelper.callback(callback);
+                }
             }
-        }
 
-        // log that we failed delivery as we are exhausted
-        logFailedDelivery(false, false, handled, false, isDeadLetterChannel, exchange, msg, data, null);
+            // create log message
+            String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange);
+            msg = msg + ". Exhausted after delivery attempt: " + redeliveryCounter + " caught: " + caught;
+            if (processor != null) {
+                if (isDeadLetterChannel && deadLetterUri != null) {
+                    msg = msg + ". Handled by DeadLetterChannel: [" + URISupport.sanitizeUri(deadLetterUri) + "]";
+                } else {
+                    msg = msg + ". Processed by failure processor: " + processor;
+                }
+            }
 
-        return sync;
-    }
+            // log that we failed delivery as we are exhausted
+            logFailedDelivery(false, false, handled, false, isDeadLetterChannel, exchange, msg, null);
+        }
 
-    protected void prepareExchangeAfterFailure(final Exchange exchange, final RedeliveryData data, final boolean isDeadLetterChannel,
-                                               final boolean shouldHandle, final boolean shouldContinue) {
+        protected void prepareExchangeAfterFailure(final Exchange exchange, final boolean isDeadLetterChannel,
+                                                   final boolean shouldHandle, final boolean shouldContinue) {
 
-        Exception newException = exchange.getException();
+            Exception newException = exchange.getException();
 
-        // we could not process the exchange so we let the failure processor handled it
-        ExchangeHelper.setFailureHandled(exchange);
+            // we could not process the exchange so we let the failure processor handled it
+            ExchangeHelper.setFailureHandled(exchange);
 
-        // honor if already set a handling
-        boolean alreadySet = exchange.getProperty(Exchange.ERRORHANDLER_HANDLED) != null;
-        if (alreadySet) {
-            boolean handled = exchange.getProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.class);
-            log.trace("This exchange has already been marked for handling: {}", handled);
-            if (!handled) {
-                // exception not handled, put exception back in the exchange
-                exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class));
-                // and put failure endpoint back as well
-                exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
+            // honor if already set a handling
+            boolean alreadySet = exchange.getProperty(Exchange.ERRORHANDLER_HANDLED) != null;
+            if (alreadySet) {
+                boolean handled = exchange.getProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.class);
+                log.trace("This exchange has already been marked for handling: {}", handled);
+                if (!handled) {
+                    // exception not handled, put exception back in the exchange
+                    exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class));
+                    // and put failure endpoint back as well
+                    exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
+                }
+                return;
             }
-            return;
-        }
 
-        // dead letter channel is special
-        if (shouldContinue) {
-            log.trace("This exchange is continued: {}", exchange);
-            // okay we want to continue then prepare the exchange for that as well
-            prepareExchangeForContinue(exchange, data, isDeadLetterChannel);
-        } else if (shouldHandle) {
-            log.trace("This exchange is handled so its marked as not failed: {}", exchange);
-            exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.TRUE);
-        } else {
-            // okay the redelivery policy are not explicit set to true, so we should allow to check for some
-            // special situations when using dead letter channel
-            if (isDeadLetterChannel) {
-
-                // DLC is always handling the first thrown exception,
-                // but if its a new exception then use the configured option
-                boolean handled = newException == null || deadLetterHandleNewException;
-
-                // when using DLC then log new exception whether its being handled or not, as otherwise it may appear as
-                // the DLC swallow new exceptions by default (which is by design to ensure the DLC always complete,
-                // to avoid causing endless poison messages that fails forever)
-                if (newException != null && data.currentRedeliveryPolicy.isLogNewException()) {
-                    String uri = URISupport.sanitizeUri(deadLetterUri);
-                    String msg = "New exception occurred during processing by the DeadLetterChannel[" + uri + "] due " + newException.getMessage();
+            // dead letter channel is special
+            if (shouldContinue) {
+                log.trace("This exchange is continued: {}", exchange);
+                // okay we want to continue then prepare the exchange for that as well
+                prepareExchangeForContinue(exchange, isDeadLetterChannel);
+            } else if (shouldHandle) {
+                log.trace("This exchange is handled so its marked as not failed: {}", exchange);
+                exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.TRUE);
+            } else {
+                // okay the redelivery policy are not explicit set to true, so we should allow to check for some
+                // special situations when using dead letter channel
+                if (isDeadLetterChannel) {
+
+                    // DLC is always handling the first thrown exception,
+                    // but if its a new exception then use the configured option
+                    boolean handled = newException == null || deadLetterHandleNewException;
+
+                    // when using DLC then log new exception whether its being handled or not, as otherwise it may appear as
+                    // the DLC swallow new exceptions by default (which is by design to ensure the DLC always complete,
+                    // to avoid causing endless poison messages that fails forever)
+                    if (newException != null && currentRedeliveryPolicy.isLogNewException()) {
+                        String uri = URISupport.sanitizeUri(deadLetterUri);
+                        String msg = "New exception occurred during processing by the DeadLetterChannel[" + uri + "] due " + newException.getMessage();
+                        if (handled) {
+                            msg += ". The new exception is being handled as deadLetterHandleNewException=true.";
+                        } else {
+                            msg += ". The new exception is not handled as deadLetterHandleNewException=false.";
+                        }
+                        logFailedDelivery(false, true, handled, false, true, exchange, msg, newException);
+                    }
+
                     if (handled) {
-                        msg += ". The new exception is being handled as deadLetterHandleNewException=true.";
-                    } else {
-                        msg += ". The new exception is not handled as deadLetterHandleNewException=false.";
+                        log.trace("This exchange is handled so its marked as not failed: {}", exchange);
+                        exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.TRUE);
+                        return;
                     }
-                    logFailedDelivery(false, true, handled, false, true, exchange, msg, data, newException);
                 }
 
-                if (handled) {
-                    log.trace("This exchange is handled so its marked as not failed: {}", exchange);
-                    exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.TRUE);
-                    return;
-                }
+                // not handled by default
+                prepareExchangeAfterFailureNotHandled(exchange);
             }
-
-            // not handled by default
-            prepareExchangeAfterFailureNotHandled(exchange);
         }
-    }
-
-    private void prepareExchangeAfterFailureNotHandled(Exchange exchange) {
-        log.trace("This exchange is not handled or continued so its marked as failed: {}", exchange);
-        // exception not handled, put exception back in the exchange
-        exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.FALSE);
-        exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class));
-        // and put failure endpoint back as well
-        exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
-        // and store the route id so we know in which route we failed
-        UnitOfWork uow = exchange.getUnitOfWork();
-        if (uow != null && uow.getRouteContext() != null) {
-            exchange.setProperty(Exchange.FAILURE_ROUTE_ID, uow.getRouteContext().getRoute().getId());
-        }
-    }
 
-    private void logFailedDelivery(boolean shouldRedeliver, boolean newException, boolean handled, boolean continued, boolean isDeadLetterChannel,
-                                   Exchange exchange, String message, RedeliveryData data, Throwable e) {
-        if (logger == null) {
-            return;
+        private void prepareExchangeAfterFailureNotHandled(Exchange exchange) {
+            log.trace("This exchange is not handled or continued so its marked as failed: {}", exchange);
+            // exception not handled, put exception back in the exchange
+            exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.FALSE);
+            exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class));
+            // and put failure endpoint back as well
+            exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
+            // and store the route id so we know in which route we failed
+            UnitOfWork uow = exchange.getUnitOfWork();
+            if (uow != null && uow.getRouteContext() != null) {
+                exchange.setProperty(Exchange.FAILURE_ROUTE_ID, uow.getRouteContext().getRoute().getId());
+            }
         }
 
-        if (!exchange.isRollbackOnly()) {
-            if (newException && !data.currentRedeliveryPolicy.isLogNewException()) {
-                // do not log new exception
+        private void logFailedDelivery(boolean shouldRedeliver, boolean newException, boolean handled, boolean continued, boolean isDeadLetterChannel,
+                                       Exchange exchange, String message, Throwable e) {
+            if (logger == null) {
                 return;
             }
 
-            // if we should not rollback, then check whether logging is enabled
+            if (!exchange.isRollbackOnly()) {
+                if (newException && !currentRedeliveryPolicy.isLogNewException()) {
+                    // do not log new exception
+                    return;
+                }
 
-            if (!newException && handled && !data.currentRedeliveryPolicy.isLogHandled()) {
-                // do not log handled
-                return;
-            }
+                // if we should not rollback, then check whether logging is enabled
 
-            if (!newException && continued && !data.currentRedeliveryPolicy.isLogContinued()) {
-                // do not log handled
-                return;
-            }
+                if (!newException && handled && !currentRedeliveryPolicy.isLogHandled()) {
+                    // do not log handled
+                    return;
+                }
 
-            if (!newException && shouldRedeliver && !data.currentRedeliveryPolicy.isLogRetryAttempted()) {
-                // do not log retry attempts
-                return;
-            }
+                if (!newException && continued && !currentRedeliveryPolicy.isLogContinued()) {
+                    // do not log handled
+                    return;
+                }
 
-            if (!newException && shouldRedeliver) {
-                if (data.currentRedeliveryPolicy.isLogRetryAttempted()) {
-                    if ((data.currentRedeliveryPolicy.getRetryAttemptedLogInterval() > 1) && (data.redeliveryCounter % data.currentRedeliveryPolicy.getRetryAttemptedLogInterval()) != 0) {
-                        // do not log retry attempt because it is excluded by the retryAttemptedLogInterval
+                if (!newException && shouldRedeliver && !currentRedeliveryPolicy.isLogRetryAttempted()) {
+                    // do not log retry attempts
+                    return;
+                }
+
+                if (!newException && shouldRedeliver) {
+                    if (currentRedeliveryPolicy.isLogRetryAttempted()) {
+                        if ((currentRedeliveryPolicy.getRetryAttemptedLogInterval() > 1) && (redeliveryCounter % currentRedeliveryPolicy.getRetryAttemptedLogInterval()) != 0) {
+                            // do not log retry attempt because it is excluded by the retryAttemptedLogInterval
+                            return;
+                        }
+                    } else {
+                        // do not log retry attempts
                         return;
                     }
-                } else {
-                    // do not log retry attempts
+                }
+
+                if (!newException && !shouldRedeliver && !currentRedeliveryPolicy.isLogExhausted()) {
+                    // do not log exhausted
                     return;
                 }
             }
 
-            if (!newException && !shouldRedeliver && !data.currentRedeliveryPolicy.isLogExhausted()) {
-                // do not log exhausted
-                return;
+            LoggingLevel newLogLevel;
+            boolean logStackTrace;
+            if (exchange.isRollbackOnly()) {
+                newLogLevel = currentRedeliveryPolicy.getRetriesExhaustedLogLevel();
+                logStackTrace = currentRedeliveryPolicy.isLogStackTrace();
+            } else if (shouldRedeliver) {
+                newLogLevel = currentRedeliveryPolicy.getRetryAttemptedLogLevel();
+                logStackTrace = currentRedeliveryPolicy.isLogRetryStackTrace();
+            } else {
+                newLogLevel = currentRedeliveryPolicy.getRetriesExhaustedLogLevel();
+                logStackTrace = currentRedeliveryPolicy.isLogStackTrace();
+            }
+            if (e == null) {
+                e = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class);
             }
-        }
 
-        LoggingLevel newLogLevel;
-        boolean logStackTrace;
-        if (exchange.isRollbackOnly()) {
-            newLogLevel = data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel();
-            logStackTrace = data.currentRedeliveryPolicy.isLogStackTrace();
-        } else if (shouldRedeliver) {
-            newLogLevel = data.currentRedeliveryPolicy.getRetryAttemptedLogLevel();
-            logStackTrace = data.currentRedeliveryPolicy.isLogRetryStackTrace();
-        } else {
-            newLogLevel = data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel();
-            logStackTrace = data.currentRedeliveryPolicy.isLogStackTrace();
-        }
-        if (e == null) {
-            e = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class);
-        }
+            if (newException) {
+                // log at most WARN level
+                if (newLogLevel == LoggingLevel.ERROR) {
+                    newLogLevel = LoggingLevel.WARN;
+                }
+                String msg = message;
+                if (msg == null) {
+                    msg = "New exception " + ExchangeHelper.logIds(exchange);
+                    // special for logging the new exception
+                    Throwable cause = e;
+                    if (cause != null) {
+                        msg = msg + " due: " + cause.getMessage();
+                    }
+                }
 
-        if (newException) {
-            // log at most WARN level
-            if (newLogLevel == LoggingLevel.ERROR) {
-                newLogLevel = LoggingLevel.WARN;
-            }
-            String msg = message;
-            if (msg == null) {
-                msg = "New exception " + ExchangeHelper.logIds(exchange);
-                // special for logging the new exception
-                Throwable cause = e;
+                if (e != null && logStackTrace) {
+                    logger.log(msg, e, newLogLevel);
+                } else {
+                    logger.log(msg, newLogLevel);
+                }
+            } else if (exchange.isRollbackOnly()) {
+                String msg = "Rollback " + ExchangeHelper.logIds(exchange);
+                Throwable cause = exchange.getException() != null ? exchange.getException() : exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class);
                 if (cause != null) {
                     msg = msg + " due: " + cause.getMessage();
                 }
-            }
 
-            if (e != null && logStackTrace) {
-                logger.log(msg, e, newLogLevel);
+                // should we include message history
+                if (!shouldRedeliver && currentRedeliveryPolicy.isLogExhaustedMessageHistory()) {
+                    // only use the exchange formatter if we should log exhausted message body (and if using a custom formatter then always use it)
+                    ExchangeFormatter formatter = customExchangeFormatter
+                            ? exchangeFormatter : (currentRedeliveryPolicy.isLogExhaustedMessageBody() || camelContext.isLogExhaustedMessageBody() ? exchangeFormatter : null);
+                    String routeStackTrace = MessageHelper.dumpMessageHistoryStacktrace(exchange, formatter, false);
+                    if (routeStackTrace != null) {
+                        msg = msg + "\n" + routeStackTrace;
+                    }
+                }
+
+                if (newLogLevel == LoggingLevel.ERROR) {
+                    // log intended rollback on maximum WARN level (not ERROR)
+                    logger.log(msg, LoggingLevel.WARN);
+                } else {
+                    // otherwise use the desired logging level
+                    logger.log(msg, newLogLevel);
+                }
             } else {
-                logger.log(msg, newLogLevel);
-            }
-        } else if (exchange.isRollbackOnly()) {
-            String msg = "Rollback " + ExchangeHelper.logIds(exchange);
-            Throwable cause = exchange.getException() != null ? exchange.getException() : exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class);
-            if (cause != null) {
-                msg = msg + " due: " + cause.getMessage();
-            }
+                String msg = message;
+                // should we include message history
+                if (!shouldRedeliver && currentRedeliveryPolicy.isLogExhaustedMessageHistory()) {
+                    // only use the exchange formatter if we should log exhausted message body (and if using a custom formatter then always use it)
+                    ExchangeFormatter formatter = customExchangeFormatter
+                            ? exchangeFormatter : (currentRedeliveryPolicy.isLogExhaustedMessageBody() || camelContext.isLogExhaustedMessageBody() ? exchangeFormatter : null);
+                    String routeStackTrace = MessageHelper.dumpMessageHistoryStacktrace(exchange, formatter, e != null && logStackTrace);
+                    if (routeStackTrace != null) {
+                        msg = msg + "\n" + routeStackTrace;
+                    }
+                }
 
-            // should we include message history
-            if (!shouldRedeliver && data.currentRedeliveryPolicy.isLogExhaustedMessageHistory()) {
-                // only use the exchange formatter if we should log exhausted message body (and if using a custom formatter then always use it)
-                ExchangeFormatter formatter = customExchangeFormatter
-                    ? exchangeFormatter : (data.currentRedeliveryPolicy.isLogExhaustedMessageBody() || camelContext.isLogExhaustedMessageBody() ? exchangeFormatter : null);
-                String routeStackTrace = MessageHelper.dumpMessageHistoryStacktrace(exchange, formatter, false);
-                if (routeStackTrace != null) {
-                    msg = msg + "\n" + routeStackTrace;
+                if (e != null && logStackTrace) {
+                    logger.log(msg, e, newLogLevel);
+                } else {
+                    logger.log(msg, newLogLevel);
                 }
             }
+        }
 
-            if (newLogLevel == LoggingLevel.ERROR) {
-                // log intended rollback on maximum WARN level (no ERROR)
-                logger.log(msg, LoggingLevel.WARN);
-            } else {
-                // otherwise use the desired logging level
-                logger.log(msg, newLogLevel);
-            }
-        } else {
-            String msg = message;
-            // should we include message history
-            if (!shouldRedeliver && data.currentRedeliveryPolicy.isLogExhaustedMessageHistory()) {
-                // only use the exchange formatter if we should log exhausted message body (and if using a custom formatter then always use it)
-                ExchangeFormatter formatter = customExchangeFormatter
-                    ? exchangeFormatter : (data.currentRedeliveryPolicy.isLogExhaustedMessageBody() || camelContext.isLogExhaustedMessageBody() ? exchangeFormatter : null);
-                String routeStackTrace = MessageHelper.dumpMessageHistoryStacktrace(exchange, formatter, e != null && logStackTrace);
-                if (routeStackTrace != null) {
-                    msg = msg + "\n" + routeStackTrace;
-                }
+        /**
+         * Determines whether the exchange is exhausted (or anyway marked to not continue such as rollback).
+         * <p/>
+         * If the exchange is exhausted, then we will not continue processing, but let the
+         * failure processor deal with the exchange.
+         *
+         * @param exchange the current exchange
+         * @return <tt>false</tt> to continue/redeliver, or <tt>true</tt> to exhaust.
+         */
+        private boolean isExhausted(Exchange exchange) {
+            // if marked as rollback only then do not continue/redeliver
+            boolean exhausted = exchange.getProperty(Exchange.REDELIVERY_EXHAUSTED, false, Boolean.class);
+            if (exhausted) {
+                log.trace("This exchange is marked as redelivery exhausted: {}", exchange);
+                return true;
             }
 
-            if (e != null && logStackTrace) {
-                logger.log(msg, e, newLogLevel);
-            } else {
-                logger.log(msg, newLogLevel);
+            // if marked as rollback only then do not continue/redeliver
+            boolean rollbackOnly = exchange.getProperty(Exchange.ROLLBACK_ONLY, false, Boolean.class);
+            if (rollbackOnly) {
+                log.trace("This exchange is marked as rollback only, so forcing it to be exhausted: {}", exchange);
+                return true;
             }
+            // its the first original call so continue
+            if (redeliveryCounter == 0) {
+                return false;
+            }
+            // its a potential redelivery so determine if we should redeliver or not
+            boolean redeliver = currentRedeliveryPolicy.shouldRedeliver(exchange, redeliveryCounter, retryWhilePredicate);
+            return !redeliver;
         }
-    }
 
-    /**
-     * Determines whether the exchange is exhausted (or anyway marked to not continue such as rollback).
-     * <p/>
-     * If the exchange is exhausted, then we will not continue processing, but let the
-     * failure processor deal with the exchange.
-     *
-     * @param exchange the current exchange
-     * @param data     the redelivery data
-     * @return <tt>false</tt> to continue/redeliver, or <tt>true</tt> to exhaust.
-     */
-    private boolean isExhausted(Exchange exchange, RedeliveryData data) {
-        // if marked as rollback only then do not continue/redeliver
-        boolean exhausted = exchange.getProperty(Exchange.REDELIVERY_EXHAUSTED, false, Boolean.class);
-        if (exhausted) {
-            log.trace("This exchange is marked as redelivery exhausted: {}", exchange);
-            return true;
+        /**
+         * Determines whether or not to continue if we are exhausted.
+         *
+         * @param exchange the current exchange
+         * @return <tt>true</tt> to continue, or <tt>false</tt> to exhaust.
+         */
+        private boolean shouldContinue(Exchange exchange) {
+            if (continuedPredicate != null) {
+                return continuedPredicate.matches(exchange);
+            }
+            // do not continue by default
+            return false;
         }
 
-        // if marked as rollback only then do not continue/redeliver
-        boolean rollbackOnly = exchange.getProperty(Exchange.ROLLBACK_ONLY, false, Boolean.class);
-        if (rollbackOnly) {
-            log.trace("This exchange is marked as rollback only, so forcing it to be exhausted: {}", exchange);
-            return true;
-        }
-        // its the first original call so continue
-        if (data.redeliveryCounter == 0) {
+        /**
+         * Determines whether or not to handle if we are exhausted.
+         *
+         * @param exchange the current exchange
+         * @return <tt>true</tt> to handle, or <tt>false</tt> to exhaust.
+         */
+        private boolean shouldHandle(Exchange exchange) {
+            if (handledPredicate != null) {
+                return handledPredicate.matches(exchange);
+            }
+            // do not handle by default
             return false;
         }
-        // its a potential redelivery so determine if we should redeliver or not
-        boolean redeliver = data.currentRedeliveryPolicy.shouldRedeliver(exchange, data.redeliveryCounter, data.retryWhilePredicate);
-        return !redeliver;
-    }
 
-    /**
-     * Determines whether or not to continue if we are exhausted.
-     *
-     * @param exchange the current exchange
-     * @param data     the redelivery data
-     * @return <tt>true</tt> to continue, or <tt>false</tt> to exhaust.
-     */
-    private boolean shouldContinue(Exchange exchange, RedeliveryData data) {
-        if (data.continuedPredicate != null) {
-            return data.continuedPredicate.matches(exchange);
+        /**
+         * Increments the redelivery counter and adds the redelivered flag if the
+         * message has been redelivered
+         */
+        private int incrementRedeliveryCounter(Exchange exchange, Throwable e) {
+            Message in = exchange.getIn();
+            Integer counter = in.getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
+            int next = counter != null ? counter + 1 : 1;
+            in.setHeader(Exchange.REDELIVERY_COUNTER, next);
+            in.setHeader(Exchange.REDELIVERED, Boolean.TRUE);
+            // if maximum redeliveries is used, then provide that information as well
+            if (currentRedeliveryPolicy.getMaximumRedeliveries() > 0) {
+                in.setHeader(Exchange.REDELIVERY_MAX_COUNTER, currentRedeliveryPolicy.getMaximumRedeliveries());
+            }
+            return next;
         }
-        // do not continue by default
-        return false;
-    }
 
-    /**
-     * Determines whether or not to handle if we are exhausted.
-     *
-     * @param exchange the current exchange
-     * @param data     the redelivery data
-     * @return <tt>true</tt> to handle, or <tt>false</tt> to exhaust.
-     */
-    private boolean shouldHandle(Exchange exchange, RedeliveryData data) {
-        if (data.handledPredicate != null) {
-            return data.handledPredicate.matches(exchange);
-        }
-        // do not handle by default
-        return false;
-    }
+        /**
+         * Method for sleeping during redelivery attempts.
+         * <p/>
+         * This task is for the synchronous blocking. If using async delayed then a scheduled thread pool
+         * is used for sleeping and trigger redeliveries.
+         */
+        public boolean sleep() throws InterruptedException {
+            // for small delays then just sleep
+            if (redeliveryDelay < 1000) {
+                currentRedeliveryPolicy.sleep(redeliveryDelay);
+                return true;
+            }
 
-    /**
-     * Increments the redelivery counter and adds the redelivered flag if the
-     * message has been redelivered
-     */
-    private int incrementRedeliveryCounter(Exchange exchange, Throwable e, RedeliveryData data) {
-        Message in = exchange.getIn();
-        Integer counter = in.getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
-        int next = 1;
-        if (counter != null) {
-            next = counter + 1;
-        }
-        in.setHeader(Exchange.REDELIVERY_COUNTER, next);
-        in.setHeader(Exchange.REDELIVERED, Boolean.TRUE);
-        // if maximum redeliveries is used, then provide that information as well
-        if (data.currentRedeliveryPolicy.getMaximumRedeliveries() > 0) {
-            in.setHeader(Exchange.REDELIVERY_MAX_COUNTER, data.currentRedeliveryPolicy.getMaximumRedeliveries());
+            StopWatch watch = new StopWatch();
+
+            log.debug("Sleeping for: {} millis until attempting redelivery", redeliveryDelay);
+            while (watch.taken() < redeliveryDelay) {
+                // sleep using 1 sec interval
+
+                long delta = redeliveryDelay - watch.taken();
+                long max = Math.min(1000, delta);
+                if (max > 0) {
+                    log.trace("Sleeping for: {} millis until waking up for re-check", max);
+                    Thread.sleep(max);
+                }
+
+                // are we preparing for shutdown then only do redelivery if allowed
+                if (preparingShutdown && !currentRedeliveryPolicy.isAllowRedeliveryWhileStopping()) {
+                    log.debug("Rejected redelivery while stopping");
+                    return false;
+                }
+            }
+
+            return true;
         }
-        return next;
     }
 
     /**
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java b/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
index ffb2c58..a779716 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
@@ -44,6 +44,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.Traceable;
 import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.spi.IdAware;
+import org.apache.camel.support.AsyncProcessorConverterHelper;
 import org.apache.camel.support.AsyncProcessorHelper;
 import org.apache.camel.support.ExpressionComparator;
 import org.apache.camel.support.LoggingExceptionHandler;
@@ -73,7 +74,7 @@ public class Resequencer extends ServiceSupport implements AsyncProcessor, Navig
     private Expression expression;
 
     private final CamelContext camelContext;
-    private final Processor processor;
+    private final AsyncProcessor processor;
     private final Collection<Exchange> collection;
     private ExceptionHandler exceptionHandler;
 
@@ -96,7 +97,7 @@ public class Resequencer extends ServiceSupport implements AsyncProcessor, Navig
 
         // wrap processor in UnitOfWork so what we send out of the batch runs in a UoW
         this.camelContext = camelContext;
-        this.processor = processor;
+        this.processor = AsyncProcessorConverterHelper.convert(processor);
         this.collection = collection;
         this.expression = expression;
         this.sender = new BatchSender();
@@ -293,8 +294,11 @@ public class Resequencer extends ServiceSupport implements AsyncProcessor, Navig
      * Strategy Method to process an exchange in the batch. This method allows derived classes to perform
      * custom processing before or after an individual exchange is processed
      */
-    protected void processExchange(Exchange exchange) throws Exception {
-        processor.process(exchange);
+    protected void processExchange(Exchange exchange) {
+        processor.process(exchange, sync -> postProcess(exchange));
+    }
+
+    protected void postProcess(Exchange exchange) {
         if (exchange.getException() != null) {
             getExceptionHandler().handleException("Error processing aggregated exchange: " + exchange, exchange.getException());
         }
@@ -519,13 +523,8 @@ public class Resequencer extends ServiceSupport implements AsyncProcessor, Navig
             while (iter.hasNext()) {
                 Exchange exchange = iter.next();
                 iter.remove();
-                try {
-                    log.debug("Sending aggregated exchange: {}", exchange);
-                    processExchange(exchange);
-                } catch (Throwable t) {
-                    // must catch throwable to avoid growing memory
-                    getExceptionHandler().handleException("Error processing aggregated exchange: " + exchange, t);
-                }
+                log.debug("Sending aggregated exchange: {}", exchange);
+                processExchange(exchange);
             }
         }
     }
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
index 05e9a1e..a928f49 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
@@ -399,7 +399,7 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
                                 }
                             } catch (Exception e) {
                                 // error resolving endpoint so we should break out
-                                ex.setException(e);
+                                current.setException(e);
                                 break;
                             }
 
@@ -426,7 +426,7 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
 
                     // okay we are completely done with the routing slip
                     // so we need to signal done on the original callback so it can continue
-                    originalCallback.done(false);
+                    cb.done(false);
                 }
             });
         });
diff --git a/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
index b641ee5..b711ad7 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
@@ -124,14 +124,14 @@ public class SendProcessor extends ServiceSupport implements AsyncProcessor, Tra
             final Exchange target = configureExchange(exchange, pattern);
 
             final boolean sending = EventHelper.notifyExchangeSending(exchange.getContext(), target, destination);
-            StopWatch sw = null;
+            // record timing for sending the exchange using the producer
+            StopWatch watch;
             if (sending) {
-                sw = new StopWatch();
+                watch = new StopWatch();
+            } else {
+                watch = null;
             }
 
-            // record timing for sending the exchange using the producer
-            final StopWatch watch = sw;
-
             try {
                 log.debug(">>>> {} {}", destination, exchange);
                 return producer.process(exchange, new AsyncCallback() {
diff --git a/camel-core/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
index 785bfcc..9848a6b 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
@@ -33,6 +33,7 @@ import org.apache.camel.spi.RoutePolicy;
 import org.apache.camel.spi.Transformer;
 import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.support.OrderedComparator;
+import org.apache.camel.support.ReactiveHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -81,30 +82,23 @@ public class SharedCamelInternalProcessor {
      */
     public void process(Exchange exchange, AsyncProcessor processor, Processor resultProcessor) {
         final AsyncProcessorAwaitManager awaitManager = exchange.getContext().getAsyncProcessorAwaitManager();
-        final CountDownLatch latch = new CountDownLatch(1);
-
-        boolean sync = process(exchange, new AsyncCallback() {
-            public void done(boolean doneSync) {
-                if (!doneSync) {
-                    awaitManager.countDown(exchange, latch);
-                }
+        awaitManager.process(new AsyncProcessor() {
+            @Override
+            public boolean process(Exchange exchange, AsyncCallback callback) {
+                return SharedCamelInternalProcessor.this.process(exchange, callback, processor, resultProcessor);
             }
 
             @Override
-            public String toString() {
-                return "Done " + processor;
+            public void process(Exchange exchange) throws Exception {
+                throw new IllegalStateException();
             }
-        }, processor, resultProcessor);
-
-        if (!sync) {
-            awaitManager.await(exchange, latch);
-        }
+        }, exchange);
     }
 
     /**
      * Asynchronous API
      */
-    public boolean process(Exchange exchange, AsyncCallback callback, AsyncProcessor processor, Processor resultProcessor) {
+    public boolean process(Exchange exchange, AsyncCallback ocallback, AsyncProcessor processor, Processor resultProcessor) {
         // ----------------------------------------------------------
         // CAMEL END USER - READ ME FOR DEBUGGING TIPS
         // ----------------------------------------------------------
@@ -121,7 +115,7 @@ public class SharedCamelInternalProcessor {
 
         if (processor == null || !continueProcessing(exchange, processor)) {
             // no processor or we should not continue then we are done
-            callback.done(true);
+            ocallback.done(true);
             return true;
         }
 
@@ -135,13 +129,13 @@ public class SharedCamelInternalProcessor {
                 states[i] = state;
             } catch (Throwable e) {
                 exchange.setException(e);
-                callback.done(true);
+                ocallback.done(true);
                 return true;
             }
         }
 
         // create internal callback which will execute the advices in reverse order when done
-        callback = new InternalCallback(states, exchange, callback, resultProcessor);
+        AsyncCallback callback = new InternalCallback(states, exchange, ocallback, resultProcessor);
 
         // UNIT_OF_WORK_PROCESS_SYNC is @deprecated and we should remove it from Camel 3.0
         Object synchronous = exchange.removeProperty(Exchange.UNIT_OF_WORK_PROCESS_SYNC);
@@ -188,15 +182,17 @@ public class SharedCamelInternalProcessor {
             // CAMEL END USER - DEBUG ME HERE +++ END +++
             // ----------------------------------------------------------
 
-            // execute any after processor work (in current thread, not in the callback)
-            if (uow != null) {
-                uow.afterProcess(processor, exchange, callback, sync);
-            }
+            ReactiveHelper.scheduleLast(() -> {
+                // execute any after processor work (in current thread, not in the callback)
+                if (uow != null) {
+                    uow.afterProcess(processor, exchange, callback, sync);
+                }
 
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Exchange processed and is continued routed {} for exchangeId: {} -> {}",
-                        new Object[]{sync ? "synchronously" : "asynchronously", exchange.getExchangeId(), exchange});
-            }
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Exchange processed and is continued routed asynchronously for exchangeId: {} -> {}",
+                            exchange.getExchangeId(), exchange);
+                }
+            }, "SharedCamelInternalProcessor - UnitOfWork - afterProcess - " + processor + " - " + exchange.getExchangeId());
             return sync;
         }
     }
@@ -249,7 +245,7 @@ public class SharedCamelInternalProcessor {
                 // CAMEL END USER - DEBUG ME HERE +++ START +++
                 // ----------------------------------------------------------
                 // callback must be called
-                callback.done(doneSync);
+                ReactiveHelper.callback(callback);
                 // ----------------------------------------------------------
                 // CAMEL END USER - DEBUG ME HERE +++ END +++
                 // ----------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
index a535efb..a804f62 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
@@ -119,12 +119,9 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac
             throw exchange.getException();
         }
 
-        Iterable<ProcessorExchangePair> answer;
-        if (isStreaming()) {
-            answer = createProcessorExchangePairsIterable(exchange, value);
-        } else {
-            answer = createProcessorExchangePairsList(exchange, value);
-        }
+        Iterable<ProcessorExchangePair> answer = isStreaming()
+                ? createProcessorExchangePairsIterable(exchange, value)
+                : createProcessorExchangePairsList(exchange, value);
         if (exchange.getException() != null) {
             // force any exceptions occurred during creation of exchange paris to be thrown
             // before returning the answer;
@@ -245,8 +242,7 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac
     }
 
     @Override
-    protected void updateNewExchange(Exchange exchange, int index, Iterable<ProcessorExchangePair> allPairs,
-                                     Iterator<ProcessorExchangePair> it) {
+    protected void updateNewExchange(Exchange exchange, int index, Iterable<ProcessorExchangePair> allPairs, boolean hasNext) {
         // do not share unit of work
         exchange.setUnitOfWork(null);
 
@@ -255,7 +251,7 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac
             // non streaming mode, so we know the total size already
             exchange.setProperty(Exchange.SPLIT_SIZE, ((Collection<?>) allPairs).size());
         }
-        if (it.hasNext()) {
+        if (hasNext) {
             exchange.setProperty(Exchange.SPLIT_COMPLETE, Boolean.FALSE);
         } else {
             exchange.setProperty(Exchange.SPLIT_COMPLETE, Boolean.TRUE);
diff --git a/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java b/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java
index 37dd7ee..7414f4c 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java
@@ -82,10 +82,6 @@ public class ThroughputLogger extends ServiceSupport implements AsyncProcessor,
     }
 
     public void process(Exchange exchange) throws Exception {
-        AsyncProcessorHelper.process(this, exchange);
-    }
-
-    public boolean process(Exchange exchange, AsyncCallback callback) {
         if (startTime == 0) {
             startTime = System.currentTimeMillis();
         }
@@ -98,7 +94,14 @@ public class ThroughputLogger extends ServiceSupport implements AsyncProcessor,
                 logger.log(lastLogMessage);
             }
         }
+    }
 
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        try {
+            process(exchange);
+        } catch (Exception e) {
+            exchange.setException(e);
+        }
         callback.done(true);
         return true;
     }
diff --git a/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java
index 56cc9c5..e8178b1 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java
@@ -30,6 +30,7 @@ import org.apache.camel.spi.IdAware;
 import org.apache.camel.support.AsyncProcessorConverterHelper;
 import org.apache.camel.support.AsyncProcessorHelper;
 import org.apache.camel.support.ExchangeHelper;
+import org.apache.camel.support.ReactiveHelper;
 import org.apache.camel.support.ServiceHelper;
 import org.apache.camel.support.ServiceSupport;
 
@@ -64,80 +65,49 @@ public class TryProcessor extends ServiceSupport implements AsyncProcessor, Navi
     }
 
     public boolean process(Exchange exchange, AsyncCallback callback) {
-        Iterator<Processor> processors = next().iterator();
 
-        Object lastHandled = exchange.getProperty(Exchange.EXCEPTION_HANDLED);
-        exchange.setProperty(Exchange.EXCEPTION_HANDLED, null);
+        ReactiveHelper.schedule(new TryState(exchange, callback));
+        return false;
+    }
 
-        while (continueRouting(processors, exchange)) {
-            exchange.setProperty(Exchange.TRY_ROUTE_BLOCK, true);
-            ExchangeHelper.prepareOutToIn(exchange);
+    class TryState implements Runnable {
 
-            // process the next processor
-            Processor processor = processors.next();
-            AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor);
-            boolean sync = process(exchange, callback, processors, async, lastHandled);
+        final Exchange exchange;
+        final AsyncCallback callback;
+        final Iterator<Processor> processors;
+        final Object lastHandled;
 
-            // continue as long its being processed synchronously
-            if (!sync) {
-                log.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId());
-                // the remainder of the try .. catch .. finally will be completed async
-                // so we break out now, then the callback will be invoked which then continue routing from where we left here
-                return false;
-            }
-
-            log.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId());
+        public TryState(Exchange exchange, AsyncCallback callback) {
+            this.exchange = exchange;
+            this.callback = callback;
+            this.processors = next().iterator();
+            this.lastHandled = exchange.getProperty(Exchange.EXCEPTION_HANDLED);
+            exchange.setProperty(Exchange.EXCEPTION_HANDLED, null);
         }
 
-        ExchangeHelper.prepareOutToIn(exchange);
-        exchange.removeProperty(Exchange.TRY_ROUTE_BLOCK);
-        exchange.setProperty(Exchange.EXCEPTION_HANDLED, lastHandled);
-        log.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
-        callback.done(true);
-        return true;
-    }
-
-    protected boolean process(final Exchange exchange, final AsyncCallback callback,
-                              final Iterator<Processor> processors, final AsyncProcessor processor,
-                              final Object lastHandled) {
-        // this does the actual processing so log at trace level
-        log.trace("Processing exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
-
-        // implement asynchronous routing logic in callback so we can have the callback being
-        // triggered and then continue routing where we left
-        boolean sync = processor.process(exchange, new AsyncCallback() {
-            public void done(boolean doneSync) {
-                // we only have to handle async completion of the pipeline
-                if (doneSync) {
-                    return;
-                }
-
-                // continue processing the try .. catch .. finally asynchronously
-                while (continueRouting(processors, exchange)) {
-                    exchange.setProperty(Exchange.TRY_ROUTE_BLOCK, true);
-                    ExchangeHelper.prepareOutToIn(exchange);
-
-                    // process the next processor
-                    AsyncProcessor processor = AsyncProcessorConverterHelper.convert(processors.next());
-                    doneSync = process(exchange, callback, processors, processor, lastHandled);
-
-                    if (!doneSync) {
-                        log.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId());
-                        // the remainder of the try .. catch .. finally will be completed async
-                        // so we break out now, then the callback will be invoked which then continue routing from where we left here
-                        return;
-                    }
-                }
+        @Override
+        public void run() {
+            if (continueRouting(processors, exchange)) {
+                exchange.setProperty(Exchange.TRY_ROUTE_BLOCK, true);
+                ExchangeHelper.prepareOutToIn(exchange);
 
+                // process the next processor
+                Processor processor = processors.next();
+                AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor);
+                log.trace("Processing exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
+                async.process(exchange, doneSync -> ReactiveHelper.schedule(this));
+            } else {
                 ExchangeHelper.prepareOutToIn(exchange);
                 exchange.removeProperty(Exchange.TRY_ROUTE_BLOCK);
                 exchange.setProperty(Exchange.EXCEPTION_HANDLED, lastHandled);
                 log.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
                 callback.done(false);
             }
-        });
+        }
 
-        return sync;
+        public String toString() {
+            return "TryState[" + exchange.getExchangeId() + "]";
+        }
     }
 
     protected boolean continueRouting(Iterator<Processor> it, Exchange exchange) {
diff --git a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
index 40de8e6..b2d829c 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
@@ -251,19 +251,6 @@ public class DefaultChannel extends CamelInternalProcessor implements ModelChann
                         + " See more details at the InterceptStrategy javadoc."
                         + " Camel will use a bridge to adapt the interceptor to the asynchronous routing engine,"
                         + " but its not the most optimal solution. Please consider changing your interceptor to comply.");
-
-                // use a bridge and wrap again which allows us to adapt and leverage the asynchronous routing engine anyway
-                // however its not the most optimal solution, but we can still run.
-                InterceptorToAsyncProcessorBridge bridge = new InterceptorToAsyncProcessorBridge(target);
-                wrapped = strategy.wrapProcessorInInterceptors(routeContext.getCamelContext(), targetOutputDef, bridge, next);
-                // Avoid the stack overflow
-                if (!wrapped.equals(bridge)) {
-                    bridge.setTarget(wrapped);
-                } else {
-                    // Just skip the wrapped processor
-                    bridge.setTarget(null);
-                }
-                wrapped = bridge;
             }
             if (!(wrapped instanceof WrapProcessor)) {
                 // wrap the target so it becomes a service and we can manage its lifecycle
diff --git a/camel-core/src/main/java/org/apache/camel/reifier/DelayReifier.java b/camel-core/src/main/java/org/apache/camel/reifier/DelayReifier.java
index 645fe67..5e92eea 100644
--- a/camel-core/src/main/java/org/apache/camel/reifier/DelayReifier.java
+++ b/camel-core/src/main/java/org/apache/camel/reifier/DelayReifier.java
@@ -38,7 +38,7 @@ class DelayReifier extends ExpressionReifier<DelayDefinition> {
         Processor childProcessor = this.createChildProcessor(routeContext, false);
         Expression delay = createAbsoluteTimeDelayExpression(routeContext);
 
-        boolean async = definition.getAsyncDelayed() != null && definition.getAsyncDelayed();
+        boolean async = definition.getAsyncDelayed() == null || definition.getAsyncDelayed();
         boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, definition, async);
         ScheduledExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredScheduledExecutorService(routeContext, "Delay", definition, async);
 
diff --git a/camel-core/src/main/java/org/apache/camel/support/AsyncProcessorHelper.java b/camel-core/src/main/java/org/apache/camel/support/AsyncProcessorHelper.java
index 70c00f7..cea7343 100644
--- a/camel-core/src/main/java/org/apache/camel/support/AsyncProcessorHelper.java
+++ b/camel-core/src/main/java/org/apache/camel/support/AsyncProcessorHelper.java
@@ -46,23 +46,7 @@ public final class AsyncProcessorHelper {
      */
     public static void process(final AsyncProcessor processor, final Exchange exchange) throws Exception {
         final AsyncProcessorAwaitManager awaitManager = exchange.getContext().getAsyncProcessorAwaitManager();
-
-        final CountDownLatch latch = new CountDownLatch(1);
-        boolean sync = processor.process(exchange, new AsyncCallback() {
-            public void done(boolean doneSync) {
-                if (!doneSync) {
-                    awaitManager.countDown(exchange, latch);
-                }
-            }
-
-            @Override
-            public String toString() {
-                return "Done " + processor;
-            }
-        });
-        if (!sync) {
-            awaitManager.await(exchange, latch);
-        }
+        awaitManager.process(processor, exchange);
     }
 
 }
diff --git a/camel-core/src/main/java/org/apache/camel/support/ReactiveHelper.java b/camel-core/src/main/java/org/apache/camel/support/ReactiveHelper.java
new file mode 100644
index 0000000..f5a66e6
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/support/ReactiveHelper.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.support;
+
+import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.camel.AsyncCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReactiveHelper {
+
+    private static final ThreadLocal<Worker> WORKERS = ThreadLocal.withInitial(Worker::new);
+
+    private static final Logger LOG = LoggerFactory.getLogger(ReactiveHelper.class);
+
+    private ReactiveHelper() {
+    }
+
+    public static void scheduleMain(Runnable runnable) {
+        WORKERS.get().schedule(runnable, true, true);
+    }
+
+    public static void scheduleMain(Runnable runnable, String description) {
+        WORKERS.get().schedule(describe(runnable, description), true, true);
+    }
+
+    public static void schedule(Runnable runnable) {
+        WORKERS.get().schedule(runnable, true, false);
+    }
+
+    public static void schedule(Runnable runnable, String description) {
+        WORKERS.get().schedule(describe(runnable, description), true, false);
+    }
+
+    public static void scheduleLast(Runnable runnable, String description) {
+        WORKERS.get().schedule(describe(runnable, description), false, false);
+    }
+
+    public static boolean executeFromQueue() {
+        return WORKERS.get().executeFromQueue();
+    }
+
+    public static void callback(AsyncCallback callback) {
+        schedule(new Runnable() {
+            @Override
+            public void run() {
+                callback.done(false);
+            }
+            @Override
+            public String toString() {
+                return "Callback[" + callback + "]";
+            }
+        });
+    }
+
+    private static Runnable describe(Runnable runnable, String description) {
+        return new Runnable() {
+            @Override
+            public void run() {
+                runnable.run();
+            }
+            @Override
+            public String toString() {
+                return description;
+            }
+        };
+    }
+
+    private static class Worker {
+
+        LinkedList<Runnable> queue = new LinkedList<>();
+        LinkedList<LinkedList<Runnable>> back;
+        boolean running;
+
+        public void schedule(Runnable runnable, boolean first, boolean main) {
+            if (main) {
+                if (!queue.isEmpty()) {
+                    if (back == null) {
+                        back = new LinkedList<>();
+                    }
+                    back.push(queue);
+                    queue = new LinkedList<>();
+                }
+            }
+            if (first) {
+                queue.addFirst(runnable);
+            } else {
+                queue.addLast(runnable);
+            }
+            if (!running) {
+                running = true;
+//                Thread thread = Thread.currentThread();
+//                String name = thread.getName();
+                try {
+                    for (; ; ) {
+                        final Runnable polled = queue.poll();
+                        if (polled == null) {
+                            if (back != null && !back.isEmpty()) {
+                                queue = back.poll();
+                                continue;
+                            } else {
+                                break;
+                            }
+                        }
+                        try {
+//                            thread.setName(name + " - " + polled.toString());
+                            polled.run();
+                        } catch (Throwable t) {
+                            t.printStackTrace();
+                        }
+                    }
+                } finally {
+//                    thread.setName(name);
+                    running = false;
+                }
+            } else {
+                LOG.debug("Queuing reactive work: {}", runnable);
+            }
+        }
+
+        public boolean executeFromQueue() {
+            final Runnable polled = queue != null ? queue.poll() : null;
+            if (polled == null) {
+                return false;
+            }
+            Thread thread = Thread.currentThread();
+            String name = thread.getName();
+            try {
+                thread.setName(name + " - " + polled.toString());
+                polled.run();
+            } catch (Throwable t) {
+                t.printStackTrace();
+            } finally {
+                thread.setName(name);
+            }
+            return true;
+        }
+
+    }
+}
diff --git a/camel-core/src/test/java/org/apache/camel/component/seda/SedaBlockWhenFullTest.java b/camel-core/src/test/java/org/apache/camel/component/seda/SedaBlockWhenFullTest.java
index bbaf72b..b856ac0 100644
--- a/camel-core/src/test/java/org/apache/camel/component/seda/SedaBlockWhenFullTest.java
+++ b/camel-core/src/test/java/org/apache/camel/component/seda/SedaBlockWhenFullTest.java
@@ -39,10 +39,10 @@ public class SedaBlockWhenFullTest extends ContextTestSupport {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from(BLOCK_WHEN_FULL_URI).delay(DELAY_LONG).to(MOCK_URI);
+                from(BLOCK_WHEN_FULL_URI).delay(DELAY_LONG).syncDelayed().to(MOCK_URI);
 
                 // use same delay as above on purpose
-                from(DEFAULT_URI).delay(DELAY).to("mock:whatever");
+                from(DEFAULT_URI).delay(DELAY).syncDelayed().to("mock:whatever");
             }
         };
     }
diff --git a/camel-core/src/test/java/org/apache/camel/component/seda/SedaDefaultBlockWhenFullTest.java b/camel-core/src/test/java/org/apache/camel/component/seda/SedaDefaultBlockWhenFullTest.java
index 3006ff8..f9232ea 100644
--- a/camel-core/src/test/java/org/apache/camel/component/seda/SedaDefaultBlockWhenFullTest.java
+++ b/camel-core/src/test/java/org/apache/camel/component/seda/SedaDefaultBlockWhenFullTest.java
@@ -27,8 +27,8 @@ import org.junit.Test;
 public class SedaDefaultBlockWhenFullTest extends ContextTestSupport {
 
     private static final int QUEUE_SIZE = 1;
-    private static final int DELAY = 10;
-    private static final int DELAY_LONG = 100;
+    private static final int DELAY = 100;
+    private static final int DELAY_LONG = 1000;
     private static final String MOCK_URI = "mock:blockWhenFullOutput";
     private static final String SIZE_PARAM = "?size=%d";
     private static final String BLOCK_WHEN_FULL_URI = "seda:blockingFoo" + String.format(SIZE_PARAM, QUEUE_SIZE) + "&timeout=0";
diff --git a/camel-core/src/test/java/org/apache/camel/impl/MultipleConsumersSupportTest.java b/camel-core/src/test/java/org/apache/camel/impl/MultipleConsumersSupportTest.java
index 2d20cb9..e29ae9f 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/MultipleConsumersSupportTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/MultipleConsumersSupportTest.java
@@ -16,13 +16,20 @@
  */
 package org.apache.camel.impl;
 
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
 import org.apache.camel.FailedToStartRouteException;
 import org.apache.camel.MultipleConsumersSupport;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.support.DefaultComponent;
 import org.apache.camel.support.DefaultConsumer;
 import org.apache.camel.support.DefaultEndpoint;
 import org.junit.Test;
@@ -64,6 +71,7 @@ public class MultipleConsumersSupportTest extends ContextTestSupport {
             @Override
             public void configure() throws Exception {
                 MyOtherEndpoint my = new MyOtherEndpoint();
+                my.setCamelContext(context);
 
                 from(my).to("mock:a");
                 from(my).to("mock:b");
diff --git a/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesShutdownGracefulTest.java b/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesAsyncDelayShutdownGracefulTest.java
similarity index 61%
copy from camel-core/src/test/java/org/apache/camel/impl/PendingExchangesShutdownGracefulTest.java
copy to camel-core/src/test/java/org/apache/camel/impl/PendingExchangesAsyncDelayShutdownGracefulTest.java
index e7a000d..bbb0772 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesShutdownGracefulTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesAsyncDelayShutdownGracefulTest.java
@@ -16,23 +16,19 @@
  */
 package org.apache.camel.impl;
 
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
 import org.junit.Test;
 
-public class PendingExchangesShutdownGracefulTest extends ContextTestSupport {
-
-    private static String foo = "";
-    private static CountDownLatch latch = new CountDownLatch(1);
+public class PendingExchangesAsyncDelayShutdownGracefulTest extends ContextTestSupport {
 
     @Test
     public void testShutdownGraceful() throws Exception {
-        getMockEndpoint("mock:foo").expectedMinimumMessageCount(1);
+        MockEndpoint result = getMockEndpoint("mock:result");
+        MockEndpoint foo = getMockEndpoint("mock:foo");
+
+        foo.expectedMinimumMessageCount(1);
 
         template.sendBody("seda:foo", "A");
         template.sendBody("seda:foo", "B");
@@ -42,12 +38,9 @@ public class PendingExchangesShutdownGracefulTest extends ContextTestSupport {
 
         assertMockEndpointsSatisfied();
 
-        // now stop the route before its complete
-        assertTrue(latch.await(10, TimeUnit.SECONDS));
         context.stop();
 
-        // it should wait as there was 1 inflight exchange and 4 pending messages left
-        assertEquals("Should graceful shutdown", "ABCDE", foo);
+        assertEquals("Expecting all messages", 5, result.getReceivedCounter());
     }
 
     @Override
@@ -55,12 +48,10 @@ public class PendingExchangesShutdownGracefulTest extends ContextTestSupport {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("seda:foo").to("mock:foo").delay(500).process(new Processor() {
-                    public void process(Exchange exchange) throws Exception {
-                        foo = foo + exchange.getIn().getBody(String.class);
-                        latch.countDown();
-                    }
-                });
+                from("seda:foo")
+                        .to("mock:foo")
+                        .delay(1000)
+                        .to("mock:result");
             }
         };
     }
diff --git a/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesShutdownGracefulTest.java b/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesShutdownGracefulTest.java
index e7a000d..e12823a 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesShutdownGracefulTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesShutdownGracefulTest.java
@@ -55,7 +55,7 @@ public class PendingExchangesShutdownGracefulTest extends ContextTestSupport {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("seda:foo").to("mock:foo").delay(500).process(new Processor() {
+                from("seda:foo").to("mock:foo").delay(500).syncDelayed().process(new Processor() {
                     public void process(Exchange exchange) throws Exception {
                         foo = foo + exchange.getIn().getBody(String.class);
                         latch.countDown();
diff --git a/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesTwoRouteShutdownGracefulTest.java b/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesTwoRouteShutdownGracefulTest.java
index db6fcad..839fc78 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesTwoRouteShutdownGracefulTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/PendingExchangesTwoRouteShutdownGracefulTest.java
@@ -63,14 +63,14 @@ public class PendingExchangesTwoRouteShutdownGracefulTest extends ContextTestSup
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("seda:foo").to("mock:foo").delay(100).process(new Processor() {
+                from("seda:foo").to("mock:foo").delay(100).syncDelayed().process(new Processor() {
                     public void process(Exchange exchange) throws Exception {
                         foo = foo + exchange.getIn().getBody(String.class);
                         latch.countDown();
                     }
                 });
 
-                from("seda:bar").to("mock:bar").delay(50).process(new Processor() {
+                from("seda:bar").to("mock:bar").delay(50).syncDelayed().process(new Processor() {
                     public void process(Exchange exchange) throws Exception {
                         bar = bar + exchange.getIn().getBody(String.class);
                         latch.countDown();
diff --git a/camel-core/src/test/java/org/apache/camel/impl/event/EventNotifierFailureHandledEventsTest.java b/camel-core/src/test/java/org/apache/camel/impl/event/EventNotifierFailureHandledEventsTest.java
index 4f45c1a..9605543 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/event/EventNotifierFailureHandledEventsTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/event/EventNotifierFailureHandledEventsTest.java
@@ -20,6 +20,8 @@ import java.util.List;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.DelegateProcessor;
+import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.impl.event.CamelContextStartedEvent;
@@ -100,7 +102,11 @@ public class EventNotifierFailureHandledEventsTest extends ContextTestSupport {
         assertEquals("should be DLC", true, e.isDeadLetterChannel());
         assertTrue("should be marked as failure handled", e.isHandled());
         assertFalse("should not be continued", e.isContinued());
-        SendProcessor send = assertIsInstanceOf(SendProcessor.class, e.getFailureHandler());
+        Processor fh = e.getFailureHandler();
+        if (fh.getClass().getName().endsWith("ProcessorToReactiveProcessorBridge")) {
+            fh = ((DelegateProcessor) fh).getProcessor();
+        }
+        SendProcessor send = assertIsInstanceOf(SendProcessor.class, fh);
         assertEquals("mock://dead", send.getDestination().getEndpointUri());
         assertEquals("mock://dead", e.getDeadLetterUri());
 
diff --git a/camel-core/src/test/java/org/apache/camel/issues/CharlesSplitAndTryCatchRollbackIssueTest.java b/camel-core/src/test/java/org/apache/camel/issues/CharlesSplitAndTryCatchRollbackIssueTest.java
index f20b06c..21c4dbe 100644
--- a/camel-core/src/test/java/org/apache/camel/issues/CharlesSplitAndTryCatchRollbackIssueTest.java
+++ b/camel-core/src/test/java/org/apache/camel/issues/CharlesSplitAndTryCatchRollbackIssueTest.java
@@ -73,7 +73,7 @@ public class CharlesSplitAndTryCatchRollbackIssueTest extends ContextTestSupport
             fail("Should thrown an exception");
         } catch (CamelExecutionException e) {
             CamelExchangeException ee = assertIsInstanceOf(CamelExchangeException.class, e.getCause());
-            assertTrue(ee.getMessage().startsWith("Sequential processing failed for number 2."));
+            assertTrue(ee.getMessage().startsWith("Multicast processing failed for number 2."));
             RollbackExchangeException re = assertIsInstanceOf(RollbackExchangeException.class, ee.getCause());
             assertTrue(re.getMessage().startsWith("Intended rollback"));
         }
@@ -96,7 +96,7 @@ public class CharlesSplitAndTryCatchRollbackIssueTest extends ContextTestSupport
             fail("Should thrown an exception");
         } catch (CamelExecutionException e) {
             CamelExchangeException ee = assertIsInstanceOf(CamelExchangeException.class, e.getCause());
-            assertTrue(ee.getMessage().startsWith("Sequential processing failed for number 3."));
+            assertTrue(ee.getMessage().startsWith("Multicast processing failed for number 3."));
             RollbackExchangeException re = assertIsInstanceOf(RollbackExchangeException.class, ee.getCause());
             assertTrue(re.getMessage().startsWith("Intended rollback"));
         }
diff --git a/camel-core/src/test/java/org/apache/camel/issues/RecipientListParallelWithAggregationStrategyThrowingExceptionTest.java b/camel-core/src/test/java/org/apache/camel/issues/RecipientListParallelWithAggregationStrategyThrowingExceptionTest.java
index 736ebe0..b4d96f0 100644
--- a/camel-core/src/test/java/org/apache/camel/issues/RecipientListParallelWithAggregationStrategyThrowingExceptionTest.java
+++ b/camel-core/src/test/java/org/apache/camel/issues/RecipientListParallelWithAggregationStrategyThrowingExceptionTest.java
@@ -50,10 +50,13 @@ public class RecipientListParallelWithAggregationStrategyThrowingExceptionTest e
                 // must use share UoW if we want the error handler to react on
                 // exceptions
                 // from the aggregation strategy also.
-                from("direct:start").
-                recipientList(header("recipients")).aggregationStrategy(new MyAggregateBean()).
-                parallelProcessing().stopOnAggregateException().shareUnitOfWork()
-                .end()
+                from("direct:start")
+                    .recipientList(header("recipients"))
+                        .aggregationStrategy(new MyAggregateBean())
+                        .parallelProcessing()
+                        .stopOnAggregateException()
+                        .shareUnitOfWork()
+                        .end()
                     .to("mock:end");
             }
         };
diff --git a/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelRuntimeExceptionInHasNextOrNext.java b/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelRuntimeExceptionInHasNextOrNext.java
index 7f46e3a..727c230 100644
--- a/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelRuntimeExceptionInHasNextOrNext.java
+++ b/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelRuntimeExceptionInHasNextOrNext.java
@@ -58,18 +58,6 @@ public class SplitterParallelRuntimeExceptionInHasNextOrNext extends ContextTest
             }
             assertMockEndpointsSatisfied();
         }
-        List<Thread> aggregatorThreads = getAggregatorThreads();
-        assertEquals(1, aggregatorThreads.size());
-    }
-
-    private List<Thread> getAggregatorThreads() {
-        List<Thread> result = new ArrayList<>();
-        for (Thread t : Thread.getAllStackTraces().keySet()) {
-            if (t.getName().endsWith("Splitter-AggregateTask")) {
-                result.add(t);
-            }
-        }
-        return result;
     }
 
     @Override
diff --git a/camel-core/src/test/java/org/apache/camel/issues/ThreadsRejectedExecutionWithDeadLetterTest.java b/camel-core/src/test/java/org/apache/camel/issues/ThreadsRejectedExecutionWithDeadLetterTest.java
index 8b55afd..83f341f 100644
--- a/camel-core/src/test/java/org/apache/camel/issues/ThreadsRejectedExecutionWithDeadLetterTest.java
+++ b/camel-core/src/test/java/org/apache/camel/issues/ThreadsRejectedExecutionWithDeadLetterTest.java
@@ -83,7 +83,7 @@ public class ThreadsRejectedExecutionWithDeadLetterTest extends ContextTestSuppo
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("seda:start").errorHandler(deadLetterChannel("mock:failed").maximumRedeliveries(10).redeliveryDelay(10))
+                from("seda:start").errorHandler(deadLetterChannel("mock:failed").maximumRedeliveries(10).redeliveryDelay(100L))
                         .to("log:before")
                         // will use our custom pool
                         .threads()
@@ -94,7 +94,7 @@ public class ThreadsRejectedExecutionWithDeadLetterTest extends ContextTestSuppo
                         .process(new Processor() {
                             @Override
                             public void process(Exchange exchange) throws Exception {
-                                latch.await(5, TimeUnit.SECONDS);
+                                latch.await(500, TimeUnit.MILLISECONDS);
                             }
                         })
                         .to("log:after")
diff --git a/camel-core/src/test/java/org/apache/camel/processor/LoopAsyncCopyTest.java b/camel-core/src/test/java/org/apache/camel/processor/LoopAsyncCopyTest.java
index 7e1a838..d29e7f4 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/LoopAsyncCopyTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/LoopAsyncCopyTest.java
@@ -20,9 +20,6 @@ import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
 import org.junit.Test;
 
-/**
- * @version 
- */
 public class LoopAsyncCopyTest extends ContextTestSupport {
 
     @Test
diff --git a/camel-core/src/test/java/org/apache/camel/processor/LoopAsyncNoCopyTest.java b/camel-core/src/test/java/org/apache/camel/processor/LoopAsyncNoCopyTest.java
index 381a564..507ed14 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/LoopAsyncNoCopyTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/LoopAsyncNoCopyTest.java
@@ -20,9 +20,6 @@ import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
 import org.junit.Test;
 
-/**
- * @version 
- */
 public class LoopAsyncNoCopyTest extends ContextTestSupport {
 
     @Test
diff --git a/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStopOnExceptionTest.java b/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStopOnExceptionTest.java
index 23092ad..6948265 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStopOnExceptionTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStopOnExceptionTest.java
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 package org.apache.camel.processor;
+
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -70,7 +71,7 @@ public class MulticastParallelStopOnExceptionTest extends ContextTestSupport {
             fail("Should thrown an exception");
         } catch (CamelExecutionException e) {
             CamelExchangeException cause = assertIsInstanceOf(CamelExchangeException.class, e.getCause());
-            assertTrue(cause.getMessage().startsWith("Parallel processing failed for number "));
+            assertTrue(cause.getMessage().startsWith("Multicast processing failed for number "));
             assertEquals("Forced", cause.getCause().getMessage());
 
             String body = cause.getExchange().getIn().getBody(String.class);
diff --git a/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStreamingTest.java b/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStreamingTest.java
index 6ae9480..40d3063 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStreamingTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStreamingTest.java
@@ -76,7 +76,7 @@ public class MulticastParallelStreamingTest extends ContextTestSupport {
                     .end()
                     .to("mock:result");
 
-                from("direct:a").delay(250).setBody(constant("A"));
+                from("direct:a").delay(250).asyncDelayed().setBody(constant("A"));
 
                 from("direct:b").setBody(constant("B"));
             }
diff --git a/camel-core/src/test/java/org/apache/camel/processor/MulticastStopOnExceptionTest.java b/camel-core/src/test/java/org/apache/camel/processor/MulticastStopOnExceptionTest.java
index c2b82ca..39b4d23 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/MulticastStopOnExceptionTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/MulticastStopOnExceptionTest.java
@@ -51,7 +51,7 @@ public class MulticastStopOnExceptionTest extends ContextTestSupport {
             fail("Should thrown an exception");
         } catch (CamelExecutionException e) {
             CamelExchangeException cause = assertIsInstanceOf(CamelExchangeException.class, e.getCause());
-            assertTrue(cause.getMessage().startsWith("Sequential processing failed for number 1."));
+            assertTrue(cause.getMessage().startsWith("Multicast processing failed for number 1."));
             assertEquals("Forced", cause.getCause().getMessage());
         }
 
diff --git a/camel-core/src/test/java/org/apache/camel/processor/NavigateRouteTest.java b/camel-core/src/test/java/org/apache/camel/processor/NavigateRouteTest.java
index d20f77f..f26885e 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/NavigateRouteTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/NavigateRouteTest.java
@@ -16,6 +16,9 @@
  */
 package org.apache.camel.processor;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
@@ -25,10 +28,12 @@ import org.junit.Test;
 
 /**
  * Unit test for navigating a route (runtime processors, not the model).
+ *
+ * @version
  */
 public class NavigateRouteTest extends ContextTestSupport {
 
-    private static int count;
+    private static List<Processor> processors = new ArrayList<>();
 
     @Test
     public void testNavigateRoute() throws Exception {
@@ -42,7 +47,7 @@ public class NavigateRouteTest extends ContextTestSupport {
         Navigate<Processor> nav = context.getRoutes().get(0).navigate();
         navigateRoute(nav);
 
-        assertEquals("There should be 6 processors to navigate", 6, count);
+        assertEquals("There should be 6 processors to navigate", 6, processors.size());
     }
 
     @SuppressWarnings("unchecked")
@@ -50,9 +55,12 @@ public class NavigateRouteTest extends ContextTestSupport {
         if (!nav.hasNext()) {
             return;
         }
+        if (nav.getClass().getName().endsWith("ProcessorToReactiveProcessorBridge")) {
+            nav = (Navigate) ((Navigate) nav).next().get(0);
+        }
 
         for (Processor child : nav.next()) {
-            count++;
+            processors.add(child);
 
             if (child instanceof SendProcessor) {
                 SendProcessor send = (SendProcessor) child;
@@ -67,7 +75,7 @@ public class NavigateRouteTest extends ContextTestSupport {
             // navigate children
             if (child instanceof Navigate) {
                 navigateRoute((Navigate<Processor>) child);
-            } 
+            }
         }
     }
 
@@ -77,9 +85,9 @@ public class NavigateRouteTest extends ContextTestSupport {
             @Override
             public void configure() throws Exception {
                 from("direct:start")
-                    .convertBodyTo(String.class)
-                    .split(body().tokenize(" "))
-                    .to("mock:result");
+                        .convertBodyTo(String.class)
+                        .split(body().tokenize(" "))
+                        .to("mock:result");
             }
         };
     }
diff --git a/camel-core/src/test/java/org/apache/camel/processor/RecipientListContextScopedOnExceptionIssueTest.java b/camel-core/src/test/java/org/apache/camel/processor/RecipientListContextScopedOnExceptionIssueTest.java
index d42cc53..5cc6240 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/RecipientListContextScopedOnExceptionIssueTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/RecipientListContextScopedOnExceptionIssueTest.java
@@ -42,6 +42,10 @@ public class RecipientListContextScopedOnExceptionIssueTest extends ContextTestS
                                 String routeId = exchange.getUnitOfWork().getRouteContext().getRoute().getId();
                                 assertEquals("fail", routeId);
                             }
+                            @Override
+                            public String toString() {
+                                return "AssertRouteId";
+                            }
                         }).to("mock:error");
 
                 interceptSendToEndpoint("direct*").process(new Processor() {
@@ -49,6 +53,9 @@ public class RecipientListContextScopedOnExceptionIssueTest extends ContextTestS
                         String target = exchange.getIn().getHeader(Exchange.INTERCEPTED_ENDPOINT, String.class);
                         exchange.getIn().setHeader("target", target);
                     }
+                    public String toString() {
+                        return "SetTargetHeader";
+                    }
                 });
 
                 from("direct:start").routeId("start")
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ShutdownCompleteAllTasksTest.java b/camel-core/src/test/java/org/apache/camel/processor/ShutdownCompleteAllTasksTest.java
index 4c4a572..8ae88c6 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ShutdownCompleteAllTasksTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ShutdownCompleteAllTasksTest.java
@@ -30,7 +30,7 @@ import org.junit.Test;
 
 public class ShutdownCompleteAllTasksTest extends ContextTestSupport {
 
-    private static String url = "file:target/pending?initialDelay=0&delay=10";
+    private static String url = "file:target/pending?initialDelay=0&delay=10&synchronous=true";
     private static AtomicInteger counter = new AtomicInteger();
     private static CountDownLatch latch = new CountDownLatch(2);
 
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ShutdownCompleteCurrentTaskOnlyTest.java b/camel-core/src/test/java/org/apache/camel/processor/ShutdownCompleteCurrentTaskOnlyTest.java
index 9ca4ae0..f9a23f7 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ShutdownCompleteCurrentTaskOnlyTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ShutdownCompleteCurrentTaskOnlyTest.java
@@ -25,7 +25,7 @@ import org.junit.Test;
 
 public class ShutdownCompleteCurrentTaskOnlyTest extends ContextTestSupport {
 
-    private static String url = "file:target/pending?initialDelay=0&delay=10";
+    private static String url = "file:target/pending?initialDelay=0&delay=10&synchronous=true";
 
     @Override
     @Before
@@ -65,7 +65,7 @@ public class ShutdownCompleteCurrentTaskOnlyTest extends ContextTestSupport {
                 from(url)
                     // let it complete only current task so we shutdown faster
                     .shutdownRunningTask(ShutdownRunningTask.CompleteCurrentTaskOnly)
-                    .delay(1000).to("seda:foo");
+                    .delay(1000).syncDelayed().to("seda:foo");
 
                 from("seda:foo").routeId("route2").to("mock:bar");
             }
diff --git a/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelStopOnExceptionTest.java b/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelStopOnExceptionTest.java
index 90067b6..f8e2303 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelStopOnExceptionTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelStopOnExceptionTest.java
@@ -67,7 +67,7 @@ public class SplitterParallelStopOnExceptionTest extends ContextTestSupport {
             fail("Should thrown an exception");
         } catch (CamelExecutionException e) {
             CamelExchangeException cause = assertIsInstanceOf(CamelExchangeException.class, e.getCause());
-            assertTrue(cause.getMessage().startsWith("Parallel processing failed for number "));
+            assertTrue(cause.getMessage().startsWith("Multicast processing failed for number "));
             assertEquals("Forced", cause.getCause().getMessage());
 
             String body = cause.getExchange().getIn().getBody(String.class);
diff --git a/camel-core/src/test/java/org/apache/camel/processor/SplitterStopOnExceptionTest.java b/camel-core/src/test/java/org/apache/camel/processor/SplitterStopOnExceptionTest.java
index 5aaeb09..c7e4542 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/SplitterStopOnExceptionTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/SplitterStopOnExceptionTest.java
@@ -48,7 +48,7 @@ public class SplitterStopOnExceptionTest extends ContextTestSupport {
             fail("Should thrown an exception");
         } catch (CamelExecutionException e) {
             CamelExchangeException cause = assertIsInstanceOf(CamelExchangeException.class, e.getCause());
-            assertTrue(cause.getMessage().startsWith("Sequential processing failed for number 1."));
+            assertTrue(cause.getMessage().startsWith("Multicast processing failed for number 1."));
             assertEquals("Forced", cause.getCause().getMessage());
         }
 
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedExecutionTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedExecutionTest.java
index e67e91c..2c94b95 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedExecutionTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedExecutionTest.java
@@ -77,7 +77,7 @@ public class ThreadsRejectedExecutionTest extends ContextTestSupport {
                     .to("log:before")
                     // will use our custom pool
                     .threads().executorService(pool).callerRunsWhenRejected(false)
-                    .delay(200)
+                    .delay(200).syncDelayed()
                     .to("log:after")
                     .to("mock:result");
             }
diff --git a/camel-core/src/test/java/org/apache/camel/processor/UnmarshalProcessorTest.java b/camel-core/src/test/java/org/apache/camel/processor/UnmarshalProcessorTest.java
index 7e65f8f..cd456a3 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/UnmarshalProcessorTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/UnmarshalProcessorTest.java
@@ -49,6 +49,8 @@ public class UnmarshalProcessorTest extends TestSupport {
         Exchange exchange2 = createExchangeWithBody(context, "body2");
         Processor processor = new UnmarshalProcessor(new MyDataFormat(exchange2));
 
+        exchange.getExchangeId();
+        exchange2.getExchangeId();
         processor.process(exchange);
 
         Exception e = exchange.getException();
diff --git a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomInterceptorTest.java b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomInterceptorTest.java
index 2b02338..c0b9c91 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomInterceptorTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCustomInterceptorTest.java
@@ -18,12 +18,14 @@ package org.apache.camel.processor.async;
 
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.NamedNode;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.DelegateAsyncProcessor;
 import org.apache.camel.spi.InterceptStrategy;
 import org.junit.Test;
 
@@ -91,13 +93,12 @@ public class AsyncEndpointCustomInterceptorTest extends ContextTestSupport {
         public Processor wrapProcessorInInterceptors(final CamelContext context, final NamedNode definition,
                                                      final Processor target, final Processor nextTarget) throws Exception {
 
-            return new Processor() {
-                public void process(Exchange exchange) throws Exception {
+            return new DelegateAsyncProcessor(target) {
+                public boolean process(final Exchange exchange, final AsyncCallback callback) {
                     // we just want to count number of interceptions
                     counter.incrementAndGet();
-
                     // and continue processing the exchange
-                    target.process(exchange);
+                    return super.process(exchange, callback);
                 }
             };
         }
diff --git a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientListParallel5Test.java b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientListParallel5Test.java
index f3a943c..09a3745 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientListParallel5Test.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientListParallel5Test.java
@@ -38,8 +38,7 @@ public class AsyncEndpointRecipientListParallel5Test extends ContextTestSupport
 
         assertMockEndpointsSatisfied();
 
-        // to hard to do parallel async routing so the caller thread is synchronized
-        assertTrue("Should use same threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+        assertNotEquals("Should use different threads", beforeThreadName, afterThreadName);
     }
 
     @Override
diff --git a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientListParallelTest.java b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientListParallelTest.java
index d521e52..3c057d4 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientListParallelTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientListParallelTest.java
@@ -36,8 +36,7 @@ public class AsyncEndpointRecipientListParallelTest extends ContextTestSupport {
         String reply = template.requestBody("direct:start", "Hello Camel", String.class);
         assertEquals("Bye Camel", reply);
 
-        // to hard to do parallel async routing so the caller thread is synchronized
-        assertTrue("Should use same threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+        assertNotEquals("Should use different threads", beforeThreadName, afterThreadName);
     }
 
     @Override
diff --git a/camel-core/src/test/java/org/apache/camel/processor/routingslip/RoutingSlipEventNotifierTest.java b/camel-core/src/test/java/org/apache/camel/processor/routingslip/RoutingSlipEventNotifierTest.java
index e17da41..84c48f5 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/routingslip/RoutingSlipEventNotifierTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/routingslip/RoutingSlipEventNotifierTest.java
@@ -71,6 +71,7 @@ public class RoutingSlipEventNotifierTest extends ContextTestSupport {
                 log.info("Sending: {}", event);
                 sending++;
             } else {
+                log.info("Sent: {}", event);
                 sent++;
             }
         }
diff --git a/camel-util/src/main/java/org/apache/camel/util/FilterIterator.java b/camel-util/src/main/java/org/apache/camel/util/FilterIterator.java
new file mode 100644
index 0000000..2e6437d
--- /dev/null
+++ b/camel-util/src/main/java/org/apache/camel/util/FilterIterator.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.function.Predicate;
+
+/**
+ * A filtering iterator
+ */
+public class FilterIterator<T> implements Iterator<T>, Closeable {
+
+    private Iterator<T> it;
+    private Predicate<T> filter;
+    private T next;
+    private boolean closed;
+
+    public FilterIterator(Iterator<T> it) {
+        this(it, null);
+    }
+
+    public FilterIterator(Iterator<T> it, Predicate<T> filter) {
+        this.it = it;
+        this.filter = filter;
+    }
+
+    @Override
+    public void close() throws IOException {
+        try {
+            IOHelper.closeIterator(it);
+        } finally {
+            // we are now closed
+            closed = true;
+            next = null;
+        }
+    }
+
+    public boolean hasNext() {
+        if (next == null) {
+            next = checkNext();
+        }
+        return next != null;
+    }
+
+    public T next() {
+        if (next == null) {
+            next = checkNext();
+        }
+        if (next != null) {
+            T ep = next;
+            next = null;
+            return ep;
+        }
+        throw new NoSuchElementException();
+    }
+
+    public void remove() {
+        it.remove();
+    }
+
+    protected T checkNext() {
+        while (!closed && it.hasNext()) {
+            T ep = it.next();
+            if (ep != null && (filter == null || filter.test(ep))) {
+                return ep;
+            }
+        }
+        return null;
+    }
+
+}
diff --git a/camel-util/src/main/java/org/apache/camel/util/IOHelper.java b/camel-util/src/main/java/org/apache/camel/util/IOHelper.java
index 98a4c4f..3a5843e 100644
--- a/camel-util/src/main/java/org/apache/camel/util/IOHelper.java
+++ b/camel-util/src/main/java/org/apache/camel/util/IOHelper.java
@@ -386,16 +386,14 @@ public final class IOHelper {
     }
 
     public static void closeIterator(Object it) throws IOException {
+        if (it instanceof Closeable) {
+            IOHelper.closeWithException((Closeable) it);
+        }
         if (it instanceof java.util.Scanner) {
-            // special for Scanner which implement the Closeable since JDK7
-            java.util.Scanner scanner = (java.util.Scanner) it;
-            scanner.close();
-            IOException ioException = scanner.ioException();
+            IOException ioException = ((java.util.Scanner) it).ioException();
             if (ioException != null) {
                 throw ioException;
             }
-        } else if (it instanceof Closeable) {
-            IOHelper.closeWithException((Closeable) it);
         }
     }
 
diff --git a/camel-util/src/main/java/org/apache/camel/util/concurrent/AsyncCompletionService.java b/camel-util/src/main/java/org/apache/camel/util/concurrent/AsyncCompletionService.java
new file mode 100644
index 0000000..f421160
--- /dev/null
+++ b/camel-util/src/main/java/org/apache/camel/util/concurrent/AsyncCompletionService.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+import java.util.PriorityQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+
+public class AsyncCompletionService<V> {
+
+    private final Executor executor;
+    private final boolean ordered;
+    private final PriorityQueue<Task> queue = new PriorityQueue<>();
+    private final AtomicLong nextId = new AtomicLong();
+    private final AtomicLong index = new AtomicLong();
+    private final ReentrantLock lock;
+    private final Condition available;
+
+    public AsyncCompletionService(Executor executor, boolean ordered) {
+        this(executor, ordered, null);
+    }
+
+    public AsyncCompletionService(Executor executor, boolean ordered, ReentrantLock lock) {
+        this.executor = executor;
+        this.ordered = ordered;
+        this.lock = lock != null ? lock : new ReentrantLock();
+        this.available = this.lock.newCondition();
+    }
+
+    public ReentrantLock getLock() {
+        return lock;
+    }
+
+    public void submit(Consumer<Consumer<V>> runner) {
+        Task f = new Task(nextId.getAndIncrement(), runner);
+        this.executor.execute(f);
+    }
+
+    public void skip() {
+        index.incrementAndGet();
+    }
+
+    public V pollUnordered() {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            Task t = queue.poll();
+            return t != null ? t.result : null;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public V poll() {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            Task t = queue.peek();
+            if (t != null && (!ordered || index.compareAndSet(t.id, t.id + 1))) {
+                queue.poll();
+                return t.result;
+            } else {
+                return null;
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public V poll(long timeout, TimeUnit unit) throws InterruptedException {
+        long nanos = unit.toNanos(timeout);
+        final ReentrantLock lock = this.lock;
+        lock.lockInterruptibly();
+        try {
+            for (;;) {
+                Task t = queue.peek();
+                if (t != null && (!ordered || index.compareAndSet(t.id, t.id + 1))) {
+                    queue.poll();
+                    return t.result;
+                }
+                if (nanos <= 0) {
+                    return null;
+                } else {
+                    nanos = available.awaitNanos(nanos);
+                }
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public V take() throws InterruptedException {
+        final ReentrantLock lock = this.lock;
+        lock.lockInterruptibly();
+        try {
+            for (;;) {
+                Task t = queue.peek();
+                if (t != null && (!ordered || index.compareAndSet(t.id, t.id + 1))) {
+                    queue.poll();
+                    return t.result;
+                }
+                available.await();
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private void complete(Task task) {
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            queue.add(task);
+            available.signalAll();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private class Task implements Runnable, Comparable<Task> {
+        private final long id;
+        private final Consumer<Consumer<V>> runner;
+        private V result;
+
+        Task(long id, Consumer<Consumer<V>> runner) {
+            this.id = id;
+            this.runner = runner;
+        }
+
+        @Override
+        public void run() {
+            runner.accept(this::setResult);
+        }
+
+        protected void setResult(V result) {
+            this.result = result;
+            complete(this);
+        }
+
+        public int compareTo(Task other) {
+            return Long.compare(this.id, other.id);
+        }
+
+        public String toString() {
+            return "SubmitOrderedFutureTask[" + this.id + "]";
+        }
+    }
+}
diff --git a/camel-util/src/test/java/org/apache/camel/util/concurrent/AsyncCompletionServiceTest.java b/camel-util/src/test/java/org/apache/camel/util/concurrent/AsyncCompletionServiceTest.java
new file mode 100644
index 0000000..565b87a
--- /dev/null
+++ b/camel-util/src/test/java/org/apache/camel/util/concurrent/AsyncCompletionServiceTest.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AsyncCompletionServiceTest extends Assert {
+
+    private ExecutorService executor;
+    private AsyncCompletionService<Object> service;
+
+    @Before
+    public void setUp() throws Exception {
+        executor = Executors.newFixedThreadPool(5);
+        service = new AsyncCompletionService<>(executor, true);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        executor.shutdownNow();
+    }
+
+    @Test
+    public void testSubmitOrdered() throws Exception {
+
+        service.submit(result("A"));
+        service.submit(result("B"));
+
+        Object a = service.take();
+        Object b = service.take();
+
+        assertEquals("A", a);
+        assertEquals("B", b);
+    }
+
+    @Test
+    public void testSubmitOrderedFirstTaskIsSlow() throws Exception {
+
+        service.submit(result("A", 200));
+        service.submit(result("B"));
+
+        Thread.sleep(300);
+
+        Object a = service.take();
+        Object b = service.take();
+
+        assertEquals("A", a);
+        assertEquals("B", b);
+    }
+
+    @Test
+    public void testSubmitOrderedFirstTaskIsSlowUsingPollTimeout() throws Exception {
+
+        service.submit(result("A", 200));
+        service.submit(result("B"));
+
+        Object a = service.poll(5, TimeUnit.SECONDS);
+        Object b = service.poll(5, TimeUnit.SECONDS);
+
+        assertEquals("A", a);
+        assertEquals("B", b);
+    }
+
+    @Test
+    public void testSubmitOrderedFirstTaskIsSlowUsingPoll() throws Exception {
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        service.submit(result("A", latch, 5, TimeUnit.SECONDS));
+        service.submit(result("B"));
+
+        // poll should not get it the first time
+        Object a = service.poll();
+        assertNull(a);
+
+        // and neither the 2nd time
+        a = service.poll();
+        assertNull(a);
+
+        // okay complete task
+        latch.countDown();
+
+        // okay take them
+        a = service.take();
+        Object b = service.take();
+
+        assertEquals("A", a);
+        assertEquals("B", b);
+    }
+
+    @Test
+    public void testSubmitOrderedSecondTaskIsSlow() throws Exception {
+
+        service.submit(result("A"));
+        service.submit(result("B", 100));
+
+        Object a = service.take();
+        Object b = service.take();
+
+        assertEquals("A", a);
+        assertEquals("B", b);
+    }
+
+    @Test
+    public void testSubmitOrderedSecondTaskIsSlowUsingPollTimeout() throws Exception {
+
+        service.submit(result("A"));
+        service.submit(result("B", 100));
+
+        Object a = service.poll(5, TimeUnit.SECONDS);
+        Object b = service.poll(5, TimeUnit.SECONDS);
+
+        assertEquals("A", a);
+        assertEquals("B", b);
+    }
+
+    @Test
+    public void testSubmitOrderedLastTaskIsSlowUsingPoll() throws Exception {
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        service.submit(result("A"));
+        service.submit(result("B", latch, 5 ,TimeUnit.SECONDS));
+
+        // take a
+        Object a = service.take();
+        assertNotNull(a);
+
+        // poll should not get it the first time
+        Object b = service.poll();
+        assertNull(b);
+
+        // and neither the 2nd time
+        b = service.poll();
+        assertNull(b);
+
+        // okay complete task
+        latch.countDown();
+
+        // okay take it
+        b = service.take();
+
+        assertEquals("A", a);
+        assertEquals("B", b);
+    }
+
+    Consumer<Consumer<Object>> result(Object r) {
+        return result -> result.accept(r);
+    }
+
+    Consumer<Consumer<Object>> result(Object r, int delay) {
+        return result -> {
+            try {
+                Thread.sleep(delay);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+            result.accept(r);
+        };
+    }
+
+    Consumer<Consumer<Object>> result(Object r, CountDownLatch latch, int timeout, TimeUnit unit) {
+        return result -> {
+            try {
+                latch.await(timeout, unit);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+            result.accept(r);
+        };
+    }
+
+}
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ShutdownCompleteAllTasksTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ShutdownCompleteAllTasksTest.xml
index e2544ab..a14a46b 100644
--- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ShutdownCompleteAllTasksTest.xml
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ShutdownCompleteAllTasksTest.xml
@@ -35,7 +35,7 @@
     <camelContext xmlns="http://camel.apache.org/schema/spring">
         <!-- let this route complete all its pending messages when asked to shutdown -->
         <route id="foo" autoStartup="false" shutdownRunningTask="CompleteAllTasks">
-            <from uri="file:target/pending?initialDelay=0&amp;delay=10"/>
+            <from uri="file:target/pending?initialDelay=0&amp;delay=10&amp;synchronous=true"/>
             <delay><constant>1000</constant></delay>
             <process ref="myProcessor"/>
             <to uri="mock:bar"/>
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ShutdownCompleteCurrentTaskOnlyTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ShutdownCompleteCurrentTaskOnlyTest.xml
index d6989e1..65001a4 100644
--- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ShutdownCompleteCurrentTaskOnlyTest.xml
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ShutdownCompleteCurrentTaskOnlyTest.xml
@@ -31,7 +31,7 @@
 
     <camelContext xmlns="http://camel.apache.org/schema/spring">
         <route startupOrder="1" shutdownRunningTask="CompleteCurrentTaskOnly">
-            <from uri="file:target/pending?initialDelay=0&amp;delay=10"/>
+            <from uri="file:target/pending?initialDelay=0&amp;delay=10&amp;synchronous=true"/>
             <delay><constant>1000</constant></delay>
             <to uri="seda:foo"/>
         </route>