You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2023/10/19 05:18:51 UTC

[camel] branch main updated: (chores) camel-core: reduce method size to force inlining

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 50ec2d6a150 (chores) camel-core: reduce method size to force inlining
50ec2d6a150 is described below

commit 50ec2d6a150d26868719edde03a5cc35c30e1556
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Tue Oct 17 18:46:23 2023 +0200

    (chores) camel-core: reduce method size to force inlining
    
    Reduce the size of a few hot methods, so that their byte code size is smaller than 325 bytes, to increase the chance that the JVM will inline them when possible.
---
 .../camel/impl/engine/CamelInternalProcessor.java  | 151 ++++++++++++---------
 .../camel/impl/engine/DefaultReactiveExecutor.java |  88 +++++++-----
 .../impl/engine/DefaultStreamCachingStrategy.java  |  58 ++++----
 .../impl/engine/SharedCamelInternalProcessor.java  | 116 +++++++++-------
 .../camel/impl/engine/StreamCachingHelper.java     |  27 ++--
 .../errorhandler/RedeliveryErrorHandler.java       |  60 ++++----
 .../camel/main/MainDurationEventNotifier.java      |  66 +++++----
 .../org/apache/camel/support/ExchangeHelper.java   |  83 ++++++-----
 .../org/apache/camel/support/MessageSupport.java   |  11 +-
 .../org/apache/camel/support/UnitOfWorkHelper.java |  27 ++--
 10 files changed, 396 insertions(+), 291 deletions(-)

diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
index af24a796b0f..e343e51536e 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
@@ -285,7 +285,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
         // in between the:
         //   CAMEL END USER - DEBUG ME HERE +++ START +++
         //   CAMEL END USER - DEBUG ME HERE +++ END +++
-        // you can see in the code below.
+        // you can see in the code below within the processTransacted or processNonTransacted methods.
         // ----------------------------------------------------------
 
         if (processor == null || exchange.isRouteStop()) {
@@ -295,15 +295,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
         }
 
         if (shutdownStrategy.isForceShutdown()) {
-            String msg = "Run not allowed as ShutdownStrategy is forcing shutting down, will reject executing exchange: "
-                         + exchange;
-            LOG.debug(msg);
-            if (exchange.getException() == null) {
-                exchange.setException(new RejectedExecutionException(msg));
-            }
-            // force shutdown so we should not continue
-            originalCallback.done(true);
-            return true;
+            return processShutdown(exchange, originalCallback);
         }
 
         Object[] states;
@@ -328,76 +320,104 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
                     states[j++] = state;
                 }
             } catch (Exception e) {
-                // error in before so break out
-                exchange.setException(e);
-                try {
-                    originalCallback.done(true);
-                } finally {
-                    // task is done so reset
-                    if (taskFactory != null) {
-                        taskFactory.release(afterTask);
-                    }
-                }
-                return true;
+                return handleException(exchange, originalCallback, e, afterTask);
             }
         }
 
         if (exchange.isTransacted()) {
-            // must be synchronized for transacted exchanges
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Transacted Exchange must be routed synchronously for exchangeId: {} -> {}", exchange.getExchangeId(),
-                        exchange);
-            }
-            try {
-                // ----------------------------------------------------------
-                // CAMEL END USER - DEBUG ME HERE +++ START +++
-                // ----------------------------------------------------------
-                processor.process(exchange);
-                // ----------------------------------------------------------
-                // CAMEL END USER - DEBUG ME HERE +++ END +++
-                // ----------------------------------------------------------
-            } catch (Exception e) {
-                exchange.setException(e);
-            } finally {
-                // processing is done
-                afterTask.done(true);
-            }
-            // we are done synchronously - must return true
-            return true;
+            return processTransacted(exchange, afterTask);
         } else {
-            final UnitOfWork uow = exchange.getUnitOfWork();
+            return processNonTransacted(exchange, afterTask);
+        }
+    }
 
-            // optimize to only do before uow processing if really needed
-            AsyncCallback async = afterTask;
-            boolean beforeAndAfter = uow != null && uow.isBeforeAfterProcess();
-            if (beforeAndAfter) {
-                async = uow.beforeProcess(processor, exchange, async);
-            }
+    private static boolean processShutdown(Exchange exchange, AsyncCallback originalCallback) {
+        String msg = "Run not allowed as ShutdownStrategy is forcing shutting down, will reject executing exchange: "
+                     + exchange;
+        LOG.debug(msg);
+        if (exchange.getException() == null) {
+            exchange.setException(new RejectedExecutionException(msg));
+        }
+        // force shutdown so we should not continue
+        originalCallback.done(true);
+        return true;
+    }
+
+    private boolean processNonTransacted(Exchange exchange, CamelInternalTask afterTask) {
+        final AsyncCallback async = beforeProcess(exchange, afterTask);
+
+        // ----------------------------------------------------------
+        // CAMEL END USER - DEBUG ME HERE +++ START +++
+        // ----------------------------------------------------------
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Processing exchange for exchangeId: {} -> {}", exchange.getExchangeId(), exchange);
+        }
+        boolean sync = processor.process(exchange, async);
+        if (!sync) {
+            EventHelper.notifyExchangeAsyncProcessingStartedEvent(camelContext, exchange);
+        }
+
+        // ----------------------------------------------------------
+        // CAMEL END USER - DEBUG ME HERE +++ END +++
+        // ----------------------------------------------------------
 
+        // CAMEL-18255: move uow.afterProcess handling to the callback
+
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Exchange processed and is continued routed {} for exchangeId: {} -> {}",
+                    sync ? "synchronously" : "asynchronously",
+                    exchange.getExchangeId(), exchange);
+        }
+        return sync;
+    }
+
+    private AsyncCallback beforeProcess(Exchange exchange, CamelInternalTask afterTask) {
+        final UnitOfWork uow = exchange.getUnitOfWork();
+
+        // optimize to only do before uow processing if really needed
+        if (uow != null && uow.isBeforeAfterProcess()) {
+            return uow.beforeProcess(processor, exchange, afterTask);
+        }
+        return afterTask;
+    }
+
+    private boolean processTransacted(Exchange exchange, CamelInternalTask afterTask) {
+        // must be synchronized for transacted exchanges
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Transacted Exchange must be routed synchronously for exchangeId: {} -> {}", exchange.getExchangeId(),
+                    exchange);
+        }
+        try {
             // ----------------------------------------------------------
             // CAMEL END USER - DEBUG ME HERE +++ START +++
             // ----------------------------------------------------------
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Processing exchange for exchangeId: {} -> {}", exchange.getExchangeId(), exchange);
-            }
-            boolean sync = processor.process(exchange, async);
-            if (!sync) {
-                EventHelper.notifyExchangeAsyncProcessingStartedEvent(camelContext, exchange);
-            }
-
+            processor.process(exchange);
             // ----------------------------------------------------------
             // CAMEL END USER - DEBUG ME HERE +++ END +++
             // ----------------------------------------------------------
+        } catch (Exception e) {
+            exchange.setException(e);
+        } finally {
+            // processing is done
+            afterTask.done(true);
+        }
+        // we are done synchronously - must return true
+        return true;
+    }
 
-            // CAMEL-18255: move uow.afterProcess handling to the callback
-
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Exchange processed and is continued routed {} for exchangeId: {} -> {}",
-                        sync ? "synchronously" : "asynchronously",
-                        exchange.getExchangeId(), exchange);
+    private boolean handleException(
+            Exchange exchange, AsyncCallback originalCallback, Exception e, CamelInternalTask afterTask) {
+        // error in before so break out
+        exchange.setException(e);
+        try {
+            originalCallback.done(true);
+        } finally {
+            // task is done so reset
+            if (taskFactory != null) {
+                taskFactory.release(afterTask);
             }
-            return sync;
         }
+        return true;
     }
 
     @Override
@@ -820,9 +840,10 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
             }
 
             // only return UnitOfWork if we created a new as then its us that handle the lifecycle to done the created UoW
-            UnitOfWork created = null;
+
             UnitOfWork uow = exchange.getUnitOfWork();
 
+            UnitOfWork created = null;
             if (uow == null) {
                 // If there is no existing UoW, then we should start one and
                 // terminate it once processing is completed for the exchange.
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
index b0f2806917b..596565f4651 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
@@ -136,37 +136,37 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE
                 LOG.trace("Schedule [first={}, main={}, sync={}]: {}", first, main, sync, runnable);
             }
             if (main) {
-                if (!queue.isEmpty()) {
-                    if (back == null) {
-                        back = new ArrayDeque<>();
-                    }
-                    back.push(queue);
-                    queue = new ArrayDeque<>();
-                }
+                executeMainFlow();
             }
             if (first) {
                 queue.addFirst(runnable);
-                if (stats) {
-                    executor.pendingTasks.increment();
-                }
             } else {
                 queue.addLast(runnable);
-                if (stats) {
-                    executor.pendingTasks.increment();
+            }
+
+            incrementPendingTasks();
+            tryExecuteReactiveWork(runnable, sync);
+        }
+
+        private void executeMainFlow() {
+            if (!queue.isEmpty()) {
+                if (back == null) {
+                    back = new ArrayDeque<>();
                 }
+                back.push(queue);
+                queue = new ArrayDeque<>();
             }
+        }
+
+        private void tryExecuteReactiveWork(Runnable runnable, boolean sync) {
             if (!running || sync) {
                 running = true;
-                if (stats) {
-                    executor.runningWorkers.increment();
-                }
+                incrementRunningWorkers();
                 try {
                     executeReactiveWork();
                 } finally {
                     running = false;
-                    if (stats) {
-                        executor.runningWorkers.decrement();
-                    }
+                    decrementRunningWorkers();
                 }
             } else {
                 if (LOG.isTraceEnabled()) {
@@ -186,18 +186,44 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE
                         break;
                     }
                 }
-                try {
-                    if (stats) {
-                        executor.pendingTasks.decrement();
-                    }
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("Worker #{} running: {}", number, polled);
-                    }
-                    polled.run();
-                } catch (Exception t) {
-                    LOG.warn("Error executing reactive work due to {}. This exception is ignored.",
-                            t.getMessage(), t);
+                doRun(polled);
+            }
+        }
+
+        private void doRun(Runnable polled) {
+            try {
+                decrementPendingTasks();
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Worker #{} running: {}", number, polled);
                 }
+                polled.run();
+            } catch (Exception t) {
+                LOG.warn("Error executing reactive work due to {}. This exception is ignored.",
+                        t.getMessage(), t);
+            }
+        }
+
+        private void decrementRunningWorkers() {
+            if (stats) {
+                executor.runningWorkers.decrement();
+            }
+        }
+
+        private void incrementRunningWorkers() {
+            if (stats) {
+                executor.runningWorkers.increment();
+            }
+        }
+
+        private void incrementPendingTasks() {
+            if (stats) {
+                executor.pendingTasks.increment();
+            }
+        }
+
+        private void decrementPendingTasks() {
+            if (stats) {
+                executor.pendingTasks.decrement();
             }
         }
 
@@ -207,9 +233,7 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE
                 return false;
             }
             try {
-                if (stats) {
-                    executor.pendingTasks.decrement();
-                }
+                decrementPendingTasks();
                 if (LOG.isTraceEnabled()) {
                     LOG.trace("Running: {}", polled);
                 }
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultStreamCachingStrategy.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultStreamCachingStrategy.java
index f7513cb65f2..4c0919f7b7e 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultStreamCachingStrategy.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultStreamCachingStrategy.java
@@ -260,21 +260,7 @@ public class DefaultStreamCachingStrategy extends ServiceSupport implements Came
         if (body != null) {
             boolean allowed = allowClasses == null && denyClasses == null;
             if (!allowed) {
-                Class<?> source = body.getClass();
-                if (denyClasses != null && allowClasses != null) {
-                    // deny takes precedence
-                    allowed = !isAssignableFrom(source, denyClasses);
-                    if (allowed) {
-                        allowed = isAssignableFrom(source, allowClasses);
-                    }
-                } else if (denyClasses != null) {
-                    allowed = !isAssignableFrom(source, denyClasses);
-                } else {
-                    allowed = isAssignableFrom(source, allowClasses);
-                }
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("Cache stream from class: {} is {}", source, allowed ? "allowed" : "denied");
-                }
+                allowed = checkAllowDenyList(body);
             }
             if (allowed) {
                 cache = camelContext.getTypeConverter().convertTo(StreamCache.class, message.getExchange(), body);
@@ -285,20 +271,44 @@ public class DefaultStreamCachingStrategy extends ServiceSupport implements Came
                 LOG.trace("Cached stream to {} -> {}", cache.inMemory() ? "memory" : "spool", cache);
             }
             if (statistics.isStatisticsEnabled()) {
-                try {
-                    if (cache.inMemory()) {
-                        statistics.updateMemory(cache.length());
-                    } else {
-                        statistics.updateSpool(cache.length());
-                    }
-                } catch (Exception e) {
-                    LOG.debug("Error updating cache statistics. This exception is ignored.", e);
-                }
+                computeStatistics(cache);
             }
         }
         return cache;
     }
 
+    private boolean checkAllowDenyList(Object body) {
+        boolean allowed;
+        Class<?> source = body.getClass();
+        if (denyClasses != null && allowClasses != null) {
+            // deny takes precedence
+            allowed = !isAssignableFrom(source, denyClasses);
+            if (allowed) {
+                allowed = isAssignableFrom(source, allowClasses);
+            }
+        } else if (denyClasses != null) {
+            allowed = !isAssignableFrom(source, denyClasses);
+        } else {
+            allowed = isAssignableFrom(source, allowClasses);
+        }
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Cache stream from class: {} is {}", source, allowed ? "allowed" : "denied");
+        }
+        return allowed;
+    }
+
+    private void computeStatistics(StreamCache cache) {
+        try {
+            if (cache.inMemory()) {
+                statistics.updateMemory(cache.length());
+            } else {
+                statistics.updateSpool(cache.length());
+            }
+        } catch (Exception e) {
+            LOG.debug("Error updating cache statistics. This exception is ignored.", e);
+        }
+    }
+
     protected static boolean isAssignableFrom(Class<?> source, Collection<Class<?>> targets) {
         for (Class<?> target : targets) {
             if (target.isAssignableFrom(source)) {
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SharedCamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SharedCamelInternalProcessor.java
index f8b3de187e9..d64b2c174cb 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SharedCamelInternalProcessor.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SharedCamelInternalProcessor.java
@@ -160,9 +160,7 @@ public class SharedCamelInternalProcessor implements SharedInternalProcessor {
                     states[j++] = state;
                 }
             } catch (Exception e) {
-                exchange.setException(e);
-                originalCallback.done(true);
-                return true;
+                return handleException(exchange, originalCallback, e);
             }
         }
 
@@ -170,63 +168,77 @@ public class SharedCamelInternalProcessor implements SharedInternalProcessor {
         AsyncCallback callback = new InternalCallback(states, exchange, originalCallback, resultProcessor);
 
         if (exchange.isTransacted()) {
-            // must be synchronized for transacted exchanges
-            if (LOG.isTraceEnabled()) {
-                if (exchange.isTransacted()) {
-                    LOG.trace("Transacted Exchange must be routed synchronously for exchangeId: {} -> {}",
-                            exchange.getExchangeId(), exchange);
-                } else {
-                    LOG.trace("Synchronous UnitOfWork Exchange must be routed synchronously for exchangeId: {} -> {}",
-                            exchange.getExchangeId(), exchange);
-                }
-            }
-            // ----------------------------------------------------------
-            // CAMEL END USER - DEBUG ME HERE +++ START +++
-            // ----------------------------------------------------------
-            try {
-                processor.process(exchange);
-            } catch (Exception e) {
-                exchange.setException(e);
-            }
-            // ----------------------------------------------------------
-            // CAMEL END USER - DEBUG ME HERE +++ END +++
-            // ----------------------------------------------------------
-            callback.done(true);
-            return true;
+            return processTransacted(exchange, processor, callback);
         } else {
-            final UnitOfWork uow = exchange.getUnitOfWork();
+            return processNonTransacted(exchange, processor, callback);
+        }
+    }
 
-            // do uow before processing and if a value is returned then the uow wants to be processed after in the same thread
-            AsyncCallback async = callback;
-            boolean beforeAndAfter = uow.isBeforeAfterProcess();
-            if (beforeAndAfter) {
-                async = uow.beforeProcess(processor, exchange, async);
-            }
+    private static boolean handleException(Exchange exchange, AsyncCallback originalCallback, Exception e) {
+        exchange.setException(e);
+        originalCallback.done(true);
+        return true;
+    }
 
-            // ----------------------------------------------------------
-            // CAMEL END USER - DEBUG ME HERE +++ START +++
-            // ----------------------------------------------------------
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Processing exchange for exchangeId: {} -> {}", exchange.getExchangeId(), exchange);
-            }
-            boolean sync = processor.process(exchange, async);
-            // ----------------------------------------------------------
-            // CAMEL END USER - DEBUG ME HERE +++ END +++
-            // ----------------------------------------------------------
+    private static boolean processNonTransacted(Exchange exchange, AsyncProcessor processor, AsyncCallback callback) {
+        final UnitOfWork uow = exchange.getUnitOfWork();
 
-            // optimize to only do after uow processing if really needed
-            if (beforeAndAfter) {
-                // execute any after processor work (in current thread, not in the callback)
-                uow.afterProcess(processor, exchange, callback, sync);
-            }
+        // do uow before processing and if a value is returned then the uow wants to be processed after in the same thread
+        AsyncCallback async = callback;
+        boolean beforeAndAfter = uow.isBeforeAfterProcess();
+        if (beforeAndAfter) {
+            async = uow.beforeProcess(processor, exchange, async);
+        }
+
+        // ----------------------------------------------------------
+        // CAMEL END USER - DEBUG ME HERE +++ START +++
+        // ----------------------------------------------------------
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Processing exchange for exchangeId: {} -> {}", exchange.getExchangeId(), exchange);
+        }
+        boolean sync = processor.process(exchange, async);
+        // ----------------------------------------------------------
+        // CAMEL END USER - DEBUG ME HERE +++ END +++
+        // ----------------------------------------------------------
+
+        // optimize to only do after uow processing if really needed
+        if (beforeAndAfter) {
+            // execute any after processor work (in current thread, not in the callback)
+            uow.afterProcess(processor, exchange, callback, sync);
+        }
+
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Exchange processed and is continued routed {} for exchangeId: {} -> {}",
+                    sync ? "synchronously" : "asynchronously",
+                    exchange.getExchangeId(), exchange);
+        }
+        return sync;
+    }
 
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Exchange processed and is continued routed {} for exchangeId: {} -> {}",
-                        sync ? "synchronously" : "asynchronously",
+    private static boolean processTransacted(Exchange exchange, AsyncProcessor processor, AsyncCallback callback) {
+        // must be synchronized for transacted exchanges
+        if (LOG.isTraceEnabled()) {
+            if (exchange.isTransacted()) {
+                LOG.trace("Transacted Exchange must be routed synchronously for exchangeId: {} -> {}",
+                        exchange.getExchangeId(), exchange);
+            } else {
+                LOG.trace("Synchronous UnitOfWork Exchange must be routed synchronously for exchangeId: {} -> {}",
                         exchange.getExchangeId(), exchange);
             }
-            return sync;
         }
+        // ----------------------------------------------------------
+        // CAMEL END USER - DEBUG ME HERE +++ START +++
+        // ----------------------------------------------------------
+        try {
+            processor.process(exchange);
+        } catch (Exception e) {
+            exchange.setException(e);
+        }
+        // ----------------------------------------------------------
+        // CAMEL END USER - DEBUG ME HERE +++ END +++
+        // ----------------------------------------------------------
+        callback.done(true);
+        return true;
     }
 
     /**
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/StreamCachingHelper.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/StreamCachingHelper.java
index 63fba73c035..d9ba7c9e0b2 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/StreamCachingHelper.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/StreamCachingHelper.java
@@ -45,12 +45,7 @@ final class StreamCachingHelper {
                 return sc;
             }
         } catch (Exception e) {
-            // lets allow Camels error handler to deal with stream cache failures
-            StreamCacheException tce = new StreamCacheException(null, e);
-            exchange.setException(tce);
-            // because this is stream caching error then we cannot use redelivery as the message body is corrupt
-            // so mark as redelivery exhausted
-            exchange.getExchangeExtension().setRedeliveryExhausted(true);
+            handleException(exchange, null, e);
         }
         // check if we somewhere failed due to a stream caching exception
         Throwable cause = exchange.getException();
@@ -76,15 +71,23 @@ final class StreamCachingHelper {
                 }
                 return sc;
             } catch (Exception e) {
-                // lets allow Camels error handler to deal with stream cache failures
-                StreamCacheException tce = new StreamCacheException(exchange.getMessage().getBody(), e);
-                exchange.setException(tce);
-                // because this is stream caching error then we cannot use redelivery as the message body is corrupt
-                // so mark as redelivery exhausted
-                exchange.getExchangeExtension().setRedeliveryExhausted(true);
+                handleException(exchange, e);
             }
         }
         return null;
     }
 
+    private static void handleException(Exchange exchange, Exception e) {
+        handleException(exchange, exchange.getMessage().getBody(), e);
+    }
+
+    private static void handleException(Exchange exchange, Object value, Exception e) {
+        // lets allow Camels error handler to deal with stream cache failures
+        StreamCacheException tce = new StreamCacheException(value, e);
+        exchange.setException(tce);
+        // because this is stream caching error then we cannot use redelivery as the message body is corrupt
+        // so mark as redelivery exhausted
+        exchange.getExchangeExtension().setRedeliveryExhausted(true);
+    }
+
 }
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
index 2a41f121901..a35f89616cb 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
@@ -428,26 +428,11 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
                 run = false;
             }
             if (!run) {
-                LOG.trace("Run not allowed, will reject executing exchange: {}", exchange);
-                if (exchange.getException() == null) {
-                    exchange.setException(new RejectedExecutionException());
-                }
-                AsyncCallback cb = callback;
-                taskFactory.release(this);
-                cb.done(false);
+                runNotAllowed();
                 return;
             }
             if (exchange.getExchangeExtension().isInterrupted()) {
-                // mark the exchange to stop continue routing when interrupted
-                // as we do not want to continue routing (for example a task has been cancelled)
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("Is exchangeId: {} interrupted? true", exchange.getExchangeId());
-                }
-                exchange.setRouteStop(true);
-                // we should not continue routing so call callback
-                AsyncCallback cb = callback;
-                taskFactory.release(this);
-                cb.done(false);
+                runInterrupted();
                 return;
             }
 
@@ -462,13 +447,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
 
             if (failure || bridge) {
                 // previous processing cause an exception
-                handleException();
-                onExceptionOccurred();
-                prepareExchangeAfterFailure(exchange);
-                // we do not support redelivery so continue callback
-                AsyncCallback cb = callback;
-                taskFactory.release(this);
-                reactiveExecutor.schedule(cb);
+                handlePreviousFailure();
             } else if (first) {
                 // first time call the target processor
                 first = false;
@@ -481,6 +460,39 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
             }
         }
 
+        private void handlePreviousFailure() {
+            handleException();
+            onExceptionOccurred();
+            prepareExchangeAfterFailure(exchange);
+            // we do not support redelivery so continue callback
+            AsyncCallback cb = callback;
+            taskFactory.release(this);
+            reactiveExecutor.schedule(cb);
+        }
+
+        private void runInterrupted() {
+            // mark the exchange to stop continue routing when interrupted
+            // as we do not want to continue routing (for example a task has been cancelled)
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Is exchangeId: {} interrupted? true", exchange.getExchangeId());
+            }
+            exchange.setRouteStop(true);
+            // we should not continue routing so call callback
+            AsyncCallback cb = callback;
+            taskFactory.release(this);
+            cb.done(false);
+        }
+
+        private void runNotAllowed() {
+            LOG.trace("Run not allowed, will reject executing exchange: {}", exchange);
+            if (exchange.getException() == null) {
+                exchange.setException(new RejectedExecutionException());
+            }
+            AsyncCallback cb = callback;
+            taskFactory.release(this);
+            cb.done(false);
+        }
+
         protected void handleException() {
             Exception e = exchange.getException();
             // e is never null
diff --git a/core/camel-main/src/main/java/org/apache/camel/main/MainDurationEventNotifier.java b/core/camel-main/src/main/java/org/apache/camel/main/MainDurationEventNotifier.java
index 17fd88f0508..1a3dacc4901 100644
--- a/core/camel-main/src/main/java/org/apache/camel/main/MainDurationEventNotifier.java
+++ b/core/camel-main/src/main/java/org/apache/camel/main/MainDurationEventNotifier.java
@@ -79,17 +79,8 @@ public class MainDurationEventNotifier extends EventNotifierSupport {
             return;
         }
 
-        final boolean reloaded = event.getType() == CamelEvent.Type.RouteReloaded;
-
-        if (reloaded) {
-            if (restartDuration) {
-                LOG.debug("Routes reloaded. Resetting maxMessages/maxIdleSeconds/maxSeconds");
-                shutdownStrategy.restartAwait();
-                doneMessages.reset();
-                if (watch != null) {
-                    watch.restart();
-                }
-            }
+        if (event.getType() == CamelEvent.Type.RouteReloaded) {
+            resetOnReload();
             return;
         }
 
@@ -108,30 +99,49 @@ public class MainDurationEventNotifier extends EventNotifierSupport {
                 }
 
                 if (result && shutdownStrategy.isRunAllowed()) {
-                    if ("shutdown".equalsIgnoreCase(action)) {
-                        LOG.info("Duration max messages triggering shutdown of the JVM");
-                        // use thread to shut down Camel as otherwise we would block current thread
-                        camelContext.getExecutorServiceManager().newThread("CamelMainShutdownCamelContext", this::shutdownTask)
-                                .start();
-                    } else if ("stop".equalsIgnoreCase(action)) {
-                        LOG.info("Duration max messages triggering stopping all routes");
-                        // use thread to stop routes as otherwise we would block current thread
-                        camelContext.getExecutorServiceManager().newThread("CamelMainShutdownCamelContext", this::stopTask)
-                                .start();
-                    }
+                    triggerDoneEvent();
                 }
             }
         }
 
         // idle reacts on both incoming and complete messages
         if (maxIdleSeconds > 0) {
-            final boolean begin = event.getType() == CamelEvent.Type.ExchangeCreated;
+            resetOnActivity(event, complete);
+        }
+    }
 
-            if (begin || complete) {
-                if (watch != null) {
-                    LOG.trace("Message activity so restarting stop watch");
-                    watch.restart();
-                }
+    private void triggerDoneEvent() {
+        if ("shutdown".equalsIgnoreCase(action)) {
+            LOG.info("Duration max messages triggering shutdown of the JVM");
+            // use thread to shut down Camel as otherwise we would block current thread
+            camelContext.getExecutorServiceManager().newThread("CamelMainShutdownCamelContext", this::shutdownTask)
+                    .start();
+        } else if ("stop".equalsIgnoreCase(action)) {
+            LOG.info("Duration max messages triggering stopping all routes");
+            // use thread to stop routes as otherwise we would block current thread
+            camelContext.getExecutorServiceManager().newThread("CamelMainShutdownCamelContext", this::stopTask)
+                    .start();
+        }
+    }
+
+    private void resetOnActivity(CamelEvent event, boolean complete) {
+        final boolean created = event.getType() == CamelEvent.Type.ExchangeCreated;
+
+        if (created || complete) {
+            if (watch != null) {
+                LOG.trace("Message activity so restarting stop watch");
+                watch.restart();
+            }
+        }
+    }
+
+    private void resetOnReload() {
+        if (restartDuration) {
+            LOG.debug("Routes reloaded. Resetting maxMessages/maxIdleSeconds/maxSeconds");
+            shutdownStrategy.restartAwait();
+            doneMessages.reset();
+            if (watch != null) {
+                watch.restart();
             }
         }
     }
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
index 32cd22eec71..0a2da86ec14 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
@@ -34,6 +34,7 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.CamelExecutionException;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeExtension;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.ExchangePropertyKey;
 import org.apache.camel.Message;
@@ -343,58 +344,74 @@ public final class ExchangeHelper {
 
     private static void doCopyResults(Exchange result, Exchange source, boolean preserverPattern) {
         if (result == source) {
-            // we just need to ensure MEP is as expected (eg copy result to OUT if out capable)
-            // and the result is not failed
-            if (result.getPattern().isOutCapable() && !result.hasOut() && !result.isFailed()) {
-                // copy IN to OUT as we expect a OUT response
-                result.getOut().copyFrom(source.getIn());
-            }
+            copyFromOutMessageConditionally(result, source);
             return;
         }
 
         if (source.hasOut()) {
-            if (preserverPattern) {
-                // exchange pattern sensitive
-                Message resultMessage = getResultMessage(result);
-                resultMessage.copyFrom(source.getOut());
-            } else {
-                result.getOut().copyFrom(source.getOut());
-            }
+            copyFromOutMessage(result, source, preserverPattern);
         } else {
-            // no results so lets copy the last input
-            // as the final processor on a pipeline might not
-            // have created any OUT; such as a mock:endpoint
-            // so lets assume the last IN is the OUT
-            if (!preserverPattern && result.getPattern().isOutCapable()) {
-                // only set OUT if its OUT capable
-                result.getOut().copyFrom(source.getIn());
-            } else {
-                // if not replace IN instead to keep the MEP
-                result.getIn().copyFrom(source.getIn());
-                // clear any existing OUT as the result is on the IN
-                if (result.hasOut()) {
-                    result.setOut(null);
-                }
-            }
+            copyFromInMessage(result, source, preserverPattern);
         }
 
         if (source.hasProperties()) {
             result.getProperties().putAll(source.getProperties());
         }
-        source.getExchangeExtension().copyInternalProperties(result);
-        source.getExchangeExtension().copySafeCopyPropertiesTo(result.getExchangeExtension());
+
+        final ExchangeExtension sourceExtension = source.getExchangeExtension();
+        sourceExtension.copyInternalProperties(result);
+
+        final ExchangeExtension resultExtension = result.getExchangeExtension();
+        sourceExtension.copySafeCopyPropertiesTo(resultExtension);
 
         // copy over state
         result.setRouteStop(source.isRouteStop());
         result.setRollbackOnly(source.isRollbackOnly());
         result.setRollbackOnlyLast(source.isRollbackOnlyLast());
-        result.getExchangeExtension().setNotifyEvent(source.getExchangeExtension().isNotifyEvent());
-        result.getExchangeExtension().setRedeliveryExhausted(source.getExchangeExtension().isRedeliveryExhausted());
-        result.getExchangeExtension().setErrorHandlerHandled(source.getExchangeExtension().getErrorHandlerHandled());
+        resultExtension.setNotifyEvent(sourceExtension.isNotifyEvent());
+        resultExtension.setRedeliveryExhausted(sourceExtension.isRedeliveryExhausted());
+        resultExtension.setErrorHandlerHandled(sourceExtension.getErrorHandlerHandled());
 
         result.setException(source.getException());
     }
 
+    private static void copyFromOutMessageConditionally(Exchange result, Exchange source) {
+        // we just need to ensure MEP is as expected (eg copy result to OUT if out capable)
+        // and the result is not failed
+        if (result.getPattern().isOutCapable() && !result.hasOut() && !result.isFailed()) {
+            // copy IN to OUT as we expect a OUT response
+            result.getOut().copyFrom(source.getIn());
+        }
+    }
+
+    private static void copyFromInMessage(Exchange result, Exchange source, boolean preserverPattern) {
+        // no results so lets copy the last input
+        // as the final processor on a pipeline might not
+        // have created any OUT; such as a mock:endpoint
+        // so lets assume the last IN is the OUT
+        if (!preserverPattern && result.getPattern().isOutCapable()) {
+            // only set OUT if its OUT capable
+            result.getOut().copyFrom(source.getIn());
+        } else {
+            // if not replace IN instead to keep the MEP
+            result.getIn().copyFrom(source.getIn());
+            // clear any existing OUT as the result is on the IN
+            if (result.hasOut()) {
+                result.setOut(null);
+            }
+        }
+    }
+
+    private static void copyFromOutMessage(Exchange result, Exchange source, boolean preserverPattern) {
+        if (preserverPattern) {
+            // exchange pattern sensitive
+            Message resultMessage = getResultMessage(result);
+            resultMessage.copyFrom(source.getOut());
+        } else {
+            result.getOut().copyFrom(source.getOut());
+        }
+    }
+
     /**
      * Returns the message where to write results in an exchange-pattern-sensitive way.
      *
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/MessageSupport.java b/core/camel-support/src/main/java/org/apache/camel/support/MessageSupport.java
index 97caf1aa730..f1813d0b32b 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/MessageSupport.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/MessageSupport.java
@@ -217,12 +217,7 @@ public abstract class MessageSupport implements Message, CamelContextAware, Data
 
         // the headers may be the same instance if the end user has made some mistake
         // and set the OUT message with the same header instance of the IN message etc
-        boolean sameHeadersInstance = false;
-        if (hasHeaders() && that.hasHeaders() && getHeaders() == that.getHeaders()) {
-            sameHeadersInstance = true;
-        }
-
-        if (!sameHeadersInstance) {
+        if (!sameHeaders(that)) {
             if (hasHeaders()) {
                 // okay its safe to clear the headers
                 getHeaders().clear();
@@ -233,6 +228,10 @@ public abstract class MessageSupport implements Message, CamelContextAware, Data
         }
     }
 
+    private boolean sameHeaders(Message that) {
+        return hasHeaders() && that.hasHeaders() && getHeaders() == that.getHeaders();
+    }
+
     @Override
     public Exchange getExchange() {
         return exchange;
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java
index 5450dddfcd5..e3b32b56a20 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java
@@ -64,12 +64,7 @@ public final class UnitOfWorkHelper {
 
         if (synchronizations.size() > 1) {
             // work on a copy of the list to avoid any modification which may cause ConcurrentModificationException
-            List<Synchronization> copy = new ArrayList<>(synchronizations);
-
-            // reverse so we invoke it FILO style instead of FIFO
-            Collections.reverse(copy);
-            // and honor if any was ordered by sorting it accordingly
-            copy.sort(OrderedComparator.get());
+            final List<Synchronization> copy = safeCopy(synchronizations);
 
             boolean failed = exchange.isFailed();
 
@@ -101,6 +96,13 @@ public final class UnitOfWorkHelper {
     public static void beforeRouteSynchronizations(
             Route route, Exchange exchange, List<Synchronization> synchronizations, Logger log) {
         // work on a copy of the list to avoid any modification which may cause ConcurrentModificationException
+        final List<Synchronization> copy = safeCopy(synchronizations);
+
+        // invoke synchronization callbacks
+        invokeSynchronizationCallbacks(route, exchange, log, copy);
+    }
+
+    private static List<Synchronization> safeCopy(List<Synchronization> synchronizations) {
         List<Synchronization> copy = new ArrayList<>(synchronizations);
 
         if (copy.size() > 1) {
@@ -109,8 +111,10 @@ public final class UnitOfWorkHelper {
             // and honor if any was ordered by sorting it accordingly
             copy.sort(OrderedComparator.get());
         }
+        return copy;
+    }
 
-        // invoke synchronization callbacks
+    private static void invokeSynchronizationCallbacks(Route route, Exchange exchange, Logger log, List<Synchronization> copy) {
         for (Synchronization synchronization : copy) {
             final SynchronizationRouteAware routeSynchronization = synchronization.getRouteSynchronization();
             if (routeSynchronization != null) {
@@ -128,14 +132,7 @@ public final class UnitOfWorkHelper {
     public static void afterRouteSynchronizations(
             Route route, Exchange exchange, List<Synchronization> synchronizations, Logger log) {
         // work on a copy of the list to avoid any modification which may cause ConcurrentModificationException
-        List<Synchronization> copy = new ArrayList<>(synchronizations);
-
-        if (copy.size() > 1) {
-            // reverse so we invoke it FILO style instead of FIFO
-            Collections.reverse(copy);
-            // and honor if any was ordered by sorting it accordingly
-            copy.sort(OrderedComparator.get());
-        }
+        final List<Synchronization> copy = safeCopy(synchronizations);
 
         // invoke synchronization callbacks
         for (Synchronization synchronization : copy) {