You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/01/24 13:07:52 UTC

[camel] branch master updated (6f50ca3 -> 074cdce)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from 6f50ca3  Upgrade Hibernate Validator to version 6.1.1.Final
     new f807dab  CAMEL-14435: camel-core - Optimize getting reactive executor in routing engine
     new 976dd82  CAMEL-14435: camel-core - Optimize getting shutdown strategy in routing engine
     new 074cdce  CAMEL-14435: camel-core - Optimize routing engine

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/camel/impl/engine/BaseRouteService.java |  5 ++++
 .../engine/DefaultAsyncProcessorAwaitManager.java  |  5 +++-
 .../camel/impl/engine/DefaultProducerCache.java    |  2 +-
 .../camel/impl/engine/DefaultRouteContext.java     |  2 +-
 .../camel/impl/engine/DefaultUnitOfWork.java       |  9 +++---
 .../impl/engine/SubscribeMethodProcessor.java      |  2 +-
 .../camel/processor/CamelInternalProcessor.java    | 22 ++++++++++----
 .../apache/camel/processor/MulticastProcessor.java |  2 +-
 .../java/org/apache/camel/processor/Pipeline.java  | 11 ++++---
 .../processor/SharedCamelInternalProcessor.java    | 22 ++++++++++----
 .../apache/camel/processor/UnitOfWorkProducer.java |  2 +-
 .../camel/processor/channel/DefaultChannel.java    | 10 +++++++
 .../errorhandler/RedeliveryErrorHandler.java       | 34 +++++++++++++---------
 .../org/apache/camel/reifier/AggregateReifier.java |  2 +-
 .../apache/camel/reifier/OnCompletionReifier.java  |  2 +-
 .../org/apache/camel/reifier/ProcessorReifier.java |  2 +-
 .../apache/camel/reifier/ResequenceReifier.java    |  4 +--
 .../org/apache/camel/reifier/WireTapReifier.java   |  2 +-
 18 files changed, 95 insertions(+), 45 deletions(-)


[camel] 02/03: CAMEL-14435: camel-core - Optimize getting shutdown strategy in routing engine

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 976dd82879698f67f784457f5c3dec6067d8b3fc
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Jan 24 12:34:38 2020 +0100

    CAMEL-14435: camel-core - Optimize getting shutdown strategy in routing engine
---
 .../java/org/apache/camel/processor/CamelInternalProcessor.java  | 9 ++++++++-
 .../org/apache/camel/processor/SharedCamelInternalProcessor.java | 6 ++++--
 .../camel/processor/errorhandler/RedeliveryErrorHandler.java     | 5 ++++-
 3 files changed, 16 insertions(+), 4 deletions(-)

diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
index 0f82c90..bcd767b 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
@@ -47,6 +47,7 @@ import org.apache.camel.spi.MessageHistoryFactory;
 import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.spi.RoutePolicy;
+import org.apache.camel.spi.ShutdownStrategy;
 import org.apache.camel.spi.StreamCachingStrategy;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.spi.Tracer;
@@ -99,6 +100,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements Ca
 
     private CamelContext camelContext;
     private ReactiveExecutor reactiveExecutor;
+    private ShutdownStrategy shutdownStrategy;
     private final List<CamelInternalProcessorAdvice<?>> advices = new ArrayList<>();
     private byte statefulAdvices;
 
@@ -124,6 +126,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements Ca
         // optimize to preset reactive executor
         if (camelContext != null) {
             reactiveExecutor = camelContext.getReactiveExecutor();
+            shutdownStrategy = camelContext.getShutdownStrategy();
         }
     }
 
@@ -181,9 +184,13 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements Ca
             return true;
         }
 
+        // TODO: should not be needed
         if (reactiveExecutor == null) {
             reactiveExecutor = exchange.getContext().getReactiveExecutor();
         }
+        if (shutdownStrategy == null) {
+            shutdownStrategy = exchange.getContext().getShutdownStrategy();
+        }
 
         // optimise to use object array for states, and only for the number of advices that keep state
         final Object[] states = statefulAdvices > 0 ? new Object[statefulAdvices] : EMPTY_STATES;
@@ -312,7 +319,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements Ca
         }
 
         // determine if we can still run, or the camel context is forcing a shutdown
-        boolean forceShutdown = exchange.getContext().getShutdownStrategy().forceShutdown(this);
+        boolean forceShutdown = shutdownStrategy.forceShutdown(this);
         if (forceShutdown) {
             String msg = "Run not allowed as ShutdownStrategy is forcing shutting down, will reject executing exchange: " + exchange;
             LOG.debug(msg);
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
index c47fc50..37d457c 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
@@ -17,7 +17,6 @@
 package org.apache.camel.processor;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.RejectedExecutionException;
@@ -34,6 +33,7 @@ import org.apache.camel.spi.AsyncProcessorAwaitManager;
 import org.apache.camel.spi.CamelInternalProcessorAdvice;
 import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.spi.RoutePolicy;
+import org.apache.camel.spi.ShutdownStrategy;
 import org.apache.camel.spi.Transformer;
 import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.support.AsyncCallbackToCompletableFutureAdapter;
@@ -75,6 +75,7 @@ public class SharedCamelInternalProcessor {
     private final CamelContext camelContext;
     private final ReactiveExecutor reactiveExecutor;
     private final AsyncProcessorAwaitManager awaitManager;
+    private final ShutdownStrategy shutdownStrategy;
     private final List<CamelInternalProcessorAdvice> advices;
     private byte statefulAdvices;
 
@@ -82,6 +83,7 @@ public class SharedCamelInternalProcessor {
         this.camelContext = camelContext;
         this.reactiveExecutor = camelContext.getReactiveExecutor();
         this.awaitManager = camelContext.adapt(ExtendedCamelContext.class).getAsyncProcessorAwaitManager();
+        this.shutdownStrategy = camelContext.getShutdownStrategy();
 
         if (advices != null) {
             this.advices = new ArrayList<>(advices.length);
@@ -304,7 +306,7 @@ public class SharedCamelInternalProcessor {
 
         // determine if we can still run, or the camel context is forcing a shutdown
         if (processor instanceof Service) {
-            boolean forceShutdown = exchange.getContext().getShutdownStrategy().forceShutdown((Service) processor);
+            boolean forceShutdown = shutdownStrategy.forceShutdown((Service) processor);
             if (forceShutdown) {
                 String msg = "Run not allowed as ShutdownStrategy is forcing shutting down, will reject executing exchange: " + exchange;
                 LOG.debug(msg);
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
index 2fcaae2..32d61bc 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
@@ -42,6 +42,7 @@ import org.apache.camel.spi.ExchangeFormatter;
 import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.spi.ShutdownPrepared;
+import org.apache.camel.spi.ShutdownStrategy;
 import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.support.AsyncCallbackToCompletableFutureAdapter;
 import org.apache.camel.support.AsyncProcessorConverterHelper;
@@ -73,6 +74,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
     protected final CamelContext camelContext;
     protected final ReactiveExecutor reactiveExecutor;
     protected final AsyncProcessorAwaitManager awaitManager;
+    protected final ShutdownStrategy shutdownStrategy;
     protected final Processor deadLetter;
     protected final String deadLetterUri;
     protected final boolean deadLetterHandleNewException;
@@ -102,6 +104,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
         this.camelContext = camelContext;
         this.reactiveExecutor = camelContext.getReactiveExecutor();
         this.awaitManager = camelContext.adapt(ExtendedCamelContext.class).getAsyncProcessorAwaitManager();
+        this.shutdownStrategy = camelContext.getShutdownStrategy();
         this.redeliveryProcessor = redeliveryProcessor;
         this.deadLetter = deadLetter;
         this.output = output;
@@ -494,7 +497,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
 
         protected boolean isRunAllowed() {
             // if camel context is forcing a shutdown then do not allow running
-            boolean forceShutdown = camelContext.getShutdownStrategy().forceShutdown(RedeliveryErrorHandler.this);
+            boolean forceShutdown = shutdownStrategy.forceShutdown(RedeliveryErrorHandler.this);
             if (forceShutdown) {
                 LOG.trace("isRunAllowed() -> false (Run not allowed as ShutdownStrategy is forcing shutting down)");
                 return false;


[camel] 03/03: CAMEL-14435: camel-core - Optimize routing engine

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 074cdce530367dddae229b72e9ab4cabe5b7073b
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Jan 24 13:16:35 2020 +0100

    CAMEL-14435: camel-core - Optimize routing engine
---
 .../camel/impl/engine/DefaultRouteContext.java     |  2 +-
 .../camel/impl/engine/DefaultUnitOfWork.java       |  9 +++--
 .../impl/engine/SubscribeMethodProcessor.java      |  2 +-
 .../camel/processor/CamelInternalProcessor.java    | 43 ++++++----------------
 .../apache/camel/processor/MulticastProcessor.java |  2 +-
 .../apache/camel/processor/UnitOfWorkProducer.java |  2 +-
 .../camel/processor/channel/DefaultChannel.java    | 13 +++++--
 .../org/apache/camel/reifier/AggregateReifier.java |  2 +-
 .../apache/camel/reifier/OnCompletionReifier.java  |  2 +-
 .../org/apache/camel/reifier/ProcessorReifier.java |  2 +-
 .../apache/camel/reifier/ResequenceReifier.java    |  4 +-
 .../org/apache/camel/reifier/WireTapReifier.java   |  2 +-
 12 files changed, 36 insertions(+), 49 deletions(-)

diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRouteContext.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRouteContext.java
index 282fbd5..e7739d6 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRouteContext.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRouteContext.java
@@ -176,7 +176,7 @@ public class DefaultRouteContext implements RouteContext {
             Processor target = new Pipeline(getCamelContext(), eventDrivenProcessors);
 
             // and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW
-            CamelInternalProcessor internal = new CamelInternalProcessor(target);
+            CamelInternalProcessor internal = new CamelInternalProcessor(getCamelContext(), target);
             internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(this));
 
             // and then optionally add route policy processor if a custom policy is set
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
index deba2d8..e2d69dd 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
@@ -33,6 +33,7 @@ import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
 import org.apache.camel.Service;
+import org.apache.camel.spi.InflightRepository;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.spi.SynchronizationVetoable;
@@ -60,6 +61,7 @@ public class DefaultUnitOfWork implements UnitOfWork, Service {
     private String id;
     private final Logger log;
     private final CamelContext context;
+    private final InflightRepository inflightRepository;
     private RouteContext prevRouteContext;
     private RouteContext routeContext;
     private List<Synchronization> synchronizations;
@@ -76,6 +78,7 @@ public class DefaultUnitOfWork implements UnitOfWork, Service {
             log.trace("UnitOfWork created for ExchangeId: {} with {}", exchange.getExchangeId(), exchange);
         }
         context = exchange.getContext();
+        inflightRepository = exchange.getContext().getInflightRepository();
 
         if (context.isAllowUseOriginalMessage()) {
             // special for JmsMessage as it can cause it to loose headers later.
@@ -112,7 +115,7 @@ public class DefaultUnitOfWork implements UnitOfWork, Service {
         }
 
         // register to inflight registry
-        context.getInflightRepository().add(exchange);
+        inflightRepository.add(exchange);
     }
 
     UnitOfWork newInstance(Exchange exchange) {
@@ -206,9 +209,7 @@ public class DefaultUnitOfWork implements UnitOfWork, Service {
         UnitOfWorkHelper.doneSynchronizations(exchange, synchronizations, log);
 
         // unregister from inflight registry, before signalling we are done
-        if (exchange.getContext() != null) {
-            exchange.getContext().getInflightRepository().remove(exchange);
-        }
+        inflightRepository.remove(exchange);
 
         // then fire event to signal the exchange is done
         try {
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/SubscribeMethodProcessor.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/SubscribeMethodProcessor.java
index b010417..14d2fc9 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/SubscribeMethodProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/SubscribeMethodProcessor.java
@@ -57,7 +57,7 @@ public final class SubscribeMethodProcessor extends AsyncProcessorSupport implem
         Processor answer = endpoint.getCamelContext().adapt(ExtendedCamelContext.class)
                 .getBeanProcessorFactory().createBeanProcessor(endpoint.getCamelContext(), pojo, method);
         // must ensure the consumer is being executed in an unit of work so synchronization callbacks etc is invoked
-        CamelInternalProcessor internal = new CamelInternalProcessor(answer);
+        CamelInternalProcessor internal = new CamelInternalProcessor(endpoint.getCamelContext(), answer);
         internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null));
 
         Predicate p;
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
index bcd767b..527a972 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
@@ -92,42 +92,29 @@ import org.slf4j.LoggerFactory;
  * <p/>
  * The added advices can implement {@link Ordered} to control in which order the advices are executed.
  */
-public class CamelInternalProcessor extends DelegateAsyncProcessor implements CamelContextAware {
+public class CamelInternalProcessor extends DelegateAsyncProcessor {
 
     private static final Logger LOG = LoggerFactory.getLogger(CamelInternalProcessor.class);
 
     private static final Object[] EMPTY_STATES = new Object[0];
 
-    private CamelContext camelContext;
-    private ReactiveExecutor reactiveExecutor;
-    private ShutdownStrategy shutdownStrategy;
+    private final CamelContext camelContext;
+    private final ReactiveExecutor reactiveExecutor;
+    private final ShutdownStrategy shutdownStrategy;
     private final List<CamelInternalProcessorAdvice<?>> advices = new ArrayList<>();
     private byte statefulAdvices;
 
-    public CamelInternalProcessor() {
+    public CamelInternalProcessor(CamelContext camelContext) {
+        this.camelContext = camelContext;
+        this.reactiveExecutor = camelContext.getReactiveExecutor();
+        this.shutdownStrategy = camelContext.getShutdownStrategy();
     }
 
-    public CamelInternalProcessor(Processor processor) {
+    public CamelInternalProcessor(CamelContext camelContext, Processor processor) {
         super(processor);
-    }
-
-    public CamelContext getCamelContext() {
-        return camelContext;
-    }
-
-    public void setCamelContext(CamelContext camelContext) {
         this.camelContext = camelContext;
-    }
-
-    @Override
-    protected void doStart() throws Exception {
-        super.doStart();
-
-        // optimize to preset reactive executor
-        if (camelContext != null) {
-            reactiveExecutor = camelContext.getReactiveExecutor();
-            shutdownStrategy = camelContext.getShutdownStrategy();
-        }
+        this.reactiveExecutor = camelContext.getReactiveExecutor();
+        this.shutdownStrategy = camelContext.getShutdownStrategy();
     }
 
     /**
@@ -184,14 +171,6 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements Ca
             return true;
         }
 
-        // TODO: should not be needed
-        if (reactiveExecutor == null) {
-            reactiveExecutor = exchange.getContext().getReactiveExecutor();
-        }
-        if (shutdownStrategy == null) {
-            shutdownStrategy = exchange.getContext().getShutdownStrategy();
-        }
-
         // optimise to use object array for states, and only for the number of advices that keep state
         final Object[] states = statefulAdvices > 0 ? new Object[statefulAdvices] : EMPTY_STATES;
         // optimise for loop using index access to avoid creating iterator object
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 2f4b637..7a4c0bd 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -765,7 +765,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
      * @return the unit of work processor
      */
     protected Processor createUnitOfWorkProcessor(RouteContext routeContext, Processor processor, Exchange exchange) {
-        CamelInternalProcessor internal = new CamelInternalProcessor(processor);
+        CamelInternalProcessor internal = new CamelInternalProcessor(exchange.getContext(), processor);
 
         // and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW
         UnitOfWork parent = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class);
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java b/core/camel-base/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java
index bd92cab..fecac02 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java
@@ -41,7 +41,7 @@ public final class UnitOfWorkProducer extends DefaultAsyncProducer {
         super(producer.getEndpoint());
         this.producer = producer;
         // wrap in unit of work
-        CamelInternalProcessor internal = new CamelInternalProcessor(producer);
+        CamelInternalProcessor internal = new CamelInternalProcessor(producer.getEndpoint().getCamelContext(), producer);
         internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null));
         this.processor = internal;
     }
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java b/core/camel-base/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java
index 92ef735..5034a07 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java
@@ -70,6 +70,10 @@ public class DefaultChannel extends CamelInternalProcessor implements Channel {
     private RouteContext routeContext;
     private boolean routeScoped = true;
 
+    public DefaultChannel(CamelContext camelContext) {
+        super(camelContext);
+    }
+
     @Override
     public Processor getOutput() {
         // the errorHandler is already decorated with interceptors
@@ -133,7 +137,8 @@ public class DefaultChannel extends CamelInternalProcessor implements Channel {
 
     @Override
     protected void doStart() throws Exception {
-        super.doStart();
+        // do not call super as we want to be in control here of the lifecycle
+
         // the output has now been created, so assign the output as the processor
         setProcessor(getOutput());
         ServiceHelper.startService(errorHandler, output);
@@ -141,7 +146,8 @@ public class DefaultChannel extends CamelInternalProcessor implements Channel {
 
     @Override
     protected void doStop() throws Exception {
-        super.doStop();
+        // do not call super as we want to be in control here of the lifecycle
+
         if (isRouteScoped()) {
             // only stop services if not context scoped (as context scoped is reused by others)
             ServiceHelper.stopService(output, errorHandler);
@@ -150,7 +156,8 @@ public class DefaultChannel extends CamelInternalProcessor implements Channel {
 
     @Override
     protected void doShutdown() throws Exception {
-        super.doShutdown();
+        // do not call super as we want to be in control here of the lifecycle
+
         ServiceHelper.stopAndShutdownServices(output, errorHandler);
     }
 
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/AggregateReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/AggregateReifier.java
index 5fb0845..791747e 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/AggregateReifier.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/AggregateReifier.java
@@ -52,7 +52,7 @@ public class AggregateReifier extends ProcessorReifier<AggregateDefinition> {
         Processor childProcessor = this.createChildProcessor(routeContext, true);
 
         // wrap the aggregate route in a unit of work processor
-        CamelInternalProcessor internal = new CamelInternalProcessor(childProcessor);
+        CamelInternalProcessor internal = new CamelInternalProcessor(routeContext.getCamelContext(), childProcessor);
         internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext));
 
         Expression correlation = definition.getExpression().createExpression(routeContext);
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java
index ab18213..b2d9a3c 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java
@@ -66,7 +66,7 @@ public class OnCompletionReifier extends ProcessorReifier<OnCompletionDefinition
         Processor childProcessor = this.createChildProcessor(routeContext, true);
 
         // wrap the on completion route in a unit of work processor
-        CamelInternalProcessor internal = new CamelInternalProcessor(childProcessor);
+        CamelInternalProcessor internal = new CamelInternalProcessor(routeContext.getCamelContext(), childProcessor);
         internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext));
 
         routeContext.setOnCompletion(getId(definition, routeContext), internal);
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessorReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
index a2b7cd3..f9bdb54 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
@@ -289,7 +289,7 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> extends
     protected Channel wrapChannel(RouteContext routeContext, Processor processor, ProcessorDefinition<?> child, Boolean inheritErrorHandler) throws Exception {
         // put a channel in between this and each output to control the route
         // flow logic
-        DefaultChannel channel = new DefaultChannel();
+        DefaultChannel channel = new DefaultChannel(routeContext.getCamelContext());
 
         // add interceptor strategies to the channel must be in this order:
         // camel context, route context, local
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ResequenceReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ResequenceReifier.java
index 612e020..a1e33ef 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ResequenceReifier.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ResequenceReifier.java
@@ -75,7 +75,7 @@ public class ResequenceReifier extends ProcessorReifier<ResequenceDefinition> {
         Expression expression = definition.getExpression().createExpression(routeContext);
 
         // and wrap in unit of work
-        CamelInternalProcessor internal = new CamelInternalProcessor(processor);
+        CamelInternalProcessor internal = new CamelInternalProcessor(routeContext.getCamelContext(), processor);
         internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext));
 
         ObjectHelper.notNull(config, "config", this);
@@ -108,7 +108,7 @@ public class ResequenceReifier extends ProcessorReifier<ResequenceDefinition> {
         Processor processor = this.createChildProcessor(routeContext, true);
         Expression expression = definition.getExpression().createExpression(routeContext);
 
-        CamelInternalProcessor internal = new CamelInternalProcessor(processor);
+        CamelInternalProcessor internal = new CamelInternalProcessor(routeContext.getCamelContext(), processor);
         internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext));
 
         ObjectHelper.notNull(config, "config", this);
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/WireTapReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/WireTapReifier.java
index 7a89b1a..1f16c8d 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/WireTapReifier.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/WireTapReifier.java
@@ -54,7 +54,7 @@ public class WireTapReifier extends ToDynamicReifier<WireTapDefinition<?>> {
         Processor target = wrapInErrorHandler(routeContext, dynamicTo);
 
         // and wrap in unit of work
-        CamelInternalProcessor internal = new CamelInternalProcessor(target);
+        CamelInternalProcessor internal = new CamelInternalProcessor(routeContext.getCamelContext(), target);
         internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext));
 
         // is true by default


[camel] 01/03: CAMEL-14435: camel-core - Optimize getting reactive executor in routing engine

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit f807dab045286a1a10e9c5bbce68dac9168793b6
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Jan 24 12:24:20 2020 +0100

    CAMEL-14435: camel-core - Optimize getting reactive executor in routing engine
---
 .../apache/camel/impl/engine/BaseRouteService.java |  5 ++++
 .../engine/DefaultAsyncProcessorAwaitManager.java  |  5 +++-
 .../camel/impl/engine/DefaultProducerCache.java    |  2 +-
 .../camel/processor/CamelInternalProcessor.java    | 32 ++++++++++++++++++++--
 .../java/org/apache/camel/processor/Pipeline.java  | 11 +++++---
 .../processor/SharedCamelInternalProcessor.java    | 16 ++++++++---
 .../camel/processor/channel/DefaultChannel.java    |  3 ++
 .../errorhandler/RedeliveryErrorHandler.java       | 29 +++++++++++---------
 8 files changed, 77 insertions(+), 26 deletions(-)

diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseRouteService.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseRouteService.java
index a6017c5..27748fd 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseRouteService.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseRouteService.java
@@ -26,6 +26,7 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
 import org.apache.camel.Channel;
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
@@ -174,6 +175,10 @@ public abstract class BaseRouteService extends ChildServiceSupport {
                     if (service instanceof RouteIdAware) {
                         ((RouteIdAware) service).setRouteId(route.getId());
                     }
+                    // inject camel context
+                    if (service instanceof CamelContextAware) {
+                        ((CamelContextAware) service).setCamelContext(camelContext);
+                    }
 
                     if (service instanceof Consumer) {
                         inputs.put(route, (Consumer) service);
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
index 665d779..7631456 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
@@ -31,6 +31,7 @@ import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.StaticService;
 import org.apache.camel.spi.AsyncProcessorAwaitManager;
 import org.apache.camel.spi.ExchangeFormatter;
+import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.support.MessageHelper;
@@ -86,12 +87,14 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
     }
 
     public void await(Exchange exchange, CountDownLatch latch) {
+        ReactiveExecutor reactiveExecutor = exchange.getContext().getReactiveExecutor();
         // Early exit for pending reactive queued work
         do {
             if (latch.getCount() <= 0) {
                 return;
             }
-        } while (exchange.getContext().getReactiveExecutor().executeFromQueue());
+        } while (reactiveExecutor.executeFromQueue());
+
         LOG.trace("Waiting for asynchronous callback before continuing for exchangeId: {} -> {}",
                 exchange.getExchangeId(), exchange);
         try {
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerCache.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerCache.java
index 7dd8f9a..635ab7b 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerCache.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerCache.java
@@ -74,7 +74,7 @@ public class DefaultProducerCache extends ServiceSupport implements ProducerCach
         }
 
         // internal processor used for sending
-        internalProcessor = new SharedCamelInternalProcessor(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null));
+        internalProcessor = new SharedCamelInternalProcessor(camelContext, new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null));
     }
 
     protected ProducerServicePool createServicePool(CamelContext camelContext, int cacheSize) {
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
index 0820b80..0f82c90 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
@@ -24,6 +24,7 @@ import java.util.concurrent.RejectedExecutionException;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.ExtendedExchange;
@@ -43,6 +44,7 @@ import org.apache.camel.spi.Debugger;
 import org.apache.camel.spi.InflightRepository;
 import org.apache.camel.spi.ManagementInterceptStrategy.InstrumentationProcessor;
 import org.apache.camel.spi.MessageHistoryFactory;
+import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.spi.RoutePolicy;
 import org.apache.camel.spi.StreamCachingStrategy;
@@ -89,12 +91,14 @@ import org.slf4j.LoggerFactory;
  * <p/>
  * The added advices can implement {@link Ordered} to control in which order the advices are executed.
  */
-public class CamelInternalProcessor extends DelegateAsyncProcessor {
+public class CamelInternalProcessor extends DelegateAsyncProcessor implements CamelContextAware {
 
     private static final Logger LOG = LoggerFactory.getLogger(CamelInternalProcessor.class);
 
     private static final Object[] EMPTY_STATES = new Object[0];
 
+    private CamelContext camelContext;
+    private ReactiveExecutor reactiveExecutor;
     private final List<CamelInternalProcessorAdvice<?>> advices = new ArrayList<>();
     private byte statefulAdvices;
 
@@ -105,6 +109,24 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
         super(processor);
     }
 
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        // optimize to preset reactive executor
+        if (camelContext != null) {
+            reactiveExecutor = camelContext.getReactiveExecutor();
+        }
+    }
+
     /**
      * Adds an {@link CamelInternalProcessorAdvice} advice to the list of advices to execute by this internal processor.
      *
@@ -159,6 +181,10 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
             return true;
         }
 
+        if (reactiveExecutor == null) {
+            reactiveExecutor = exchange.getContext().getReactiveExecutor();
+        }
+
         // optimise to use object array for states, and only for the number of advices that keep state
         final Object[] states = statefulAdvices > 0 ? new Object[statefulAdvices] : EMPTY_STATES;
         // optimise for loop using index access to avoid creating iterator object
@@ -198,7 +224,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
                 // ----------------------------------------------------------
                 // callback must be called
                 if (originalCallback != null) {
-                    exchange.getContext().getReactiveExecutor().schedule(originalCallback);
+                    reactiveExecutor.schedule(originalCallback);
                 }
                 // ----------------------------------------------------------
                 // CAMEL END USER - DEBUG ME HERE +++ END +++
@@ -252,7 +278,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
 
             // optimize to only do after uow processing if really needed
             if (beforeAndAfter) {
-                exchange.getContext().getReactiveExecutor().schedule(() -> {
+                reactiveExecutor.schedule(() -> {
                     // execute any after processor work (in current thread, not in the callback)
                     uow.afterProcess(processor, exchange, callback, false);
                 });
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
index 7557450..3cfba1b 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
@@ -30,6 +30,7 @@ import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
 import org.apache.camel.Traceable;
 import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.spi.RouteIdAware;
 import org.apache.camel.support.AsyncProcessorConverterHelper;
 import org.apache.camel.support.AsyncProcessorSupport;
@@ -50,12 +51,14 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo
     private static final Logger LOG = LoggerFactory.getLogger(Pipeline.class);
 
     private final CamelContext camelContext;
+    private final ReactiveExecutor reactiveExecutor;
     private List<AsyncProcessor> processors;
     private String id;
     private String routeId;
 
     public Pipeline(CamelContext camelContext, Collection<Processor> processors) {
         this.camelContext = camelContext;
+        this.reactiveExecutor = camelContext.getReactiveExecutor();
         this.processors = processors.stream().map(AsyncProcessorConverterHelper::convert).collect(Collectors.toList());
     }
 
@@ -88,9 +91,9 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
         if (exchange.isTransacted()) {
-            camelContext.getReactiveExecutor().scheduleSync(() -> Pipeline.this.doProcess(exchange, callback, processors, new AtomicInteger(), true));
+            reactiveExecutor.scheduleSync(() -> Pipeline.this.doProcess(exchange, callback, processors, new AtomicInteger(), true));
         } else {
-            camelContext.getReactiveExecutor().scheduleMain(() -> Pipeline.this.doProcess(exchange, callback, processors, new AtomicInteger(), true));
+            reactiveExecutor.scheduleMain(() -> Pipeline.this.doProcess(exchange, callback, processors, new AtomicInteger(), true));
         }
         return false;
     }
@@ -108,7 +111,7 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo
             AsyncProcessor processor = processors.get(index.getAndIncrement());
 
             processor.process(exchange, doneSync ->
-                    camelContext.getReactiveExecutor().schedule(() -> doProcess(exchange, callback, processors, index, false)));
+                    reactiveExecutor.schedule(() -> doProcess(exchange, callback, processors, index, false)));
         } else {
             ExchangeHelper.copyResults(exchange, exchange);
 
@@ -119,7 +122,7 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo
                 LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
             }
 
-            camelContext.getReactiveExecutor().schedule(callback);
+            reactiveExecutor.schedule(callback);
         }
     }
 
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
index 687b875b..c47fc50 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
@@ -24,6 +24,7 @@ import java.util.concurrent.RejectedExecutionException;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.Ordered;
@@ -31,6 +32,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.Service;
 import org.apache.camel.spi.AsyncProcessorAwaitManager;
 import org.apache.camel.spi.CamelInternalProcessorAdvice;
+import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.spi.RoutePolicy;
 import org.apache.camel.spi.Transformer;
 import org.apache.camel.spi.UnitOfWork;
@@ -70,10 +72,17 @@ public class SharedCamelInternalProcessor {
 
     private static final Logger LOG = LoggerFactory.getLogger(SharedCamelInternalProcessor.class);
     private static final Object[] EMPTY_STATES = new Object[0];
+    private final CamelContext camelContext;
+    private final ReactiveExecutor reactiveExecutor;
+    private final AsyncProcessorAwaitManager awaitManager;
     private final List<CamelInternalProcessorAdvice> advices;
     private byte statefulAdvices;
 
-    public SharedCamelInternalProcessor(CamelInternalProcessorAdvice... advices) {
+    public SharedCamelInternalProcessor(CamelContext camelContext, CamelInternalProcessorAdvice... advices) {
+        this.camelContext = camelContext;
+        this.reactiveExecutor = camelContext.getReactiveExecutor();
+        this.awaitManager = camelContext.adapt(ExtendedCamelContext.class).getAsyncProcessorAwaitManager();
+
         if (advices != null) {
             this.advices = new ArrayList<>(advices.length);
             for (CamelInternalProcessorAdvice advice : advices) {
@@ -93,7 +102,6 @@ public class SharedCamelInternalProcessor {
      * Synchronous API
      */
     public void process(Exchange exchange, AsyncProcessor processor, Processor resultProcessor) {
-        final AsyncProcessorAwaitManager awaitManager = exchange.getContext().adapt(ExtendedCamelContext.class).getAsyncProcessorAwaitManager();
         awaitManager.process(new AsyncProcessor() {
             @Override
             public boolean process(Exchange exchange, AsyncCallback callback) {
@@ -206,7 +214,7 @@ public class SharedCamelInternalProcessor {
 
             // optimize to only do after uow processing if really needed
             if (beforeAndAfter) {
-                exchange.getContext().getReactiveExecutor().schedule(() -> {
+                reactiveExecutor.schedule(() -> {
                     // execute any after processor work (in current thread, not in the callback)
                     uow.afterProcess(processor, exchange, callback, sync);
                 });
@@ -272,7 +280,7 @@ public class SharedCamelInternalProcessor {
                 // ----------------------------------------------------------
                 // callback must be called
                 if (callback != null) {
-                    exchange.getContext().getReactiveExecutor().schedule(callback);
+                    reactiveExecutor.schedule(callback);
                 }
                 // ----------------------------------------------------------
                 // CAMEL END USER - DEBUG ME HERE +++ END +++
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java b/core/camel-base/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java
index 028fb25..92ef735 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java
@@ -133,6 +133,7 @@ public class DefaultChannel extends CamelInternalProcessor implements Channel {
 
     @Override
     protected void doStart() throws Exception {
+        super.doStart();
         // the output has now been created, so assign the output as the processor
         setProcessor(getOutput());
         ServiceHelper.startService(errorHandler, output);
@@ -140,6 +141,7 @@ public class DefaultChannel extends CamelInternalProcessor implements Channel {
 
     @Override
     protected void doStop() throws Exception {
+        super.doStop();
         if (isRouteScoped()) {
             // only stop services if not context scoped (as context scoped is reused by others)
             ServiceHelper.stopService(output, errorHandler);
@@ -148,6 +150,7 @@ public class DefaultChannel extends CamelInternalProcessor implements Channel {
 
     @Override
     protected void doShutdown() throws Exception {
+        super.doShutdown();
         ServiceHelper.stopAndShutdownServices(output, errorHandler);
     }
 
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
index 637d2fc..2fcaae2 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
@@ -39,6 +39,7 @@ import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.spi.AsyncProcessorAwaitManager;
 import org.apache.camel.spi.CamelLogger;
 import org.apache.camel.spi.ExchangeFormatter;
+import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.spi.ShutdownPrepared;
 import org.apache.camel.spi.UnitOfWork;
@@ -70,6 +71,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
     protected final AtomicInteger redeliverySleepCounter = new AtomicInteger();
     protected ScheduledExecutorService executorService;
     protected final CamelContext camelContext;
+    protected final ReactiveExecutor reactiveExecutor;
     protected final AsyncProcessorAwaitManager awaitManager;
     protected final Processor deadLetter;
     protected final String deadLetterUri;
@@ -98,6 +100,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
         ObjectHelper.notNull(redeliveryPolicy, "RedeliveryPolicy", this);
 
         this.camelContext = camelContext;
+        this.reactiveExecutor = camelContext.getReactiveExecutor();
         this.awaitManager = camelContext.adapt(ExtendedCamelContext.class).getAsyncProcessorAwaitManager();
         this.redeliveryProcessor = redeliveryProcessor;
         this.deadLetter = deadLetter;
@@ -160,9 +163,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
         RedeliveryState state = new RedeliveryState(exchange, callback);
         // Run it
         if (exchange.isTransacted()) {
-            camelContext.getReactiveExecutor().scheduleSync(state);
+            reactiveExecutor.scheduleSync(state);
         } else {
-            camelContext.getReactiveExecutor().scheduleMain(state);
+            reactiveExecutor.scheduleMain(state);
         }
         return false;
     }
@@ -440,7 +443,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
                         if (LOG.isTraceEnabled()) {
                             LOG.trace("Scheduling redelivery task to run in {} millis for exchangeId: {}", redeliveryDelay, exchange.getExchangeId());
                         }
-                        executorService.schedule(() -> camelContext.getReactiveExecutor().schedule(this::redeliver), redeliveryDelay, TimeUnit.MILLISECONDS);
+                        executorService.schedule(() -> reactiveExecutor.schedule(this::redeliver), redeliveryDelay, TimeUnit.MILLISECONDS);
 
                     } else {
                         // async delayed redelivery was disabled or we are transacted so we must be synchronous
@@ -456,9 +459,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
                                 // mark the exchange as redelivery exhausted so the failure processor / dead letter channel can process the exchange
                                 exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
                                 // jump to start of loop which then detects that we are failed and exhausted
-                                camelContext.getReactiveExecutor().schedule(this);
+                                reactiveExecutor.schedule(this);
                             } else {
-                                camelContext.getReactiveExecutor().schedule(this::redeliver);
+                                reactiveExecutor.schedule(this::redeliver);
                             }
                         } catch (InterruptedException e) {
                             redeliverySleepCounter.decrementAndGet();
@@ -467,12 +470,12 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
                             // mark the exchange to stop continue routing when interrupted
                             // as we do not want to continue routing (for example a task has been cancelled)
                             exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE);
-                            camelContext.getReactiveExecutor().schedule(callback);
+                            reactiveExecutor.schedule(callback);
                         }
                     }
                 } else {
                     // execute the task immediately
-                    camelContext.getReactiveExecutor().schedule(this::redeliver);
+                    reactiveExecutor.schedule(this::redeliver);
                 }
             } else {
                 // Simple delivery
@@ -480,10 +483,10 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
                     // only process if the exchange hasn't failed
                     // and it has not been handled by the error processor
                     if (isDone(exchange)) {
-                        camelContext.getReactiveExecutor().schedule(callback);
+                        reactiveExecutor.schedule(callback);
                     } else {
                         // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
-                        camelContext.getReactiveExecutor().schedule(this);
+                        reactiveExecutor.schedule(this);
                     }
                 });
             }
@@ -561,11 +564,11 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
                 // only process if the exchange hasn't failed
                 // and it has not been handled by the error processor
                 if (isDone(exchange)) {
-                    camelContext.getReactiveExecutor().schedule(callback);
+                    reactiveExecutor.schedule(callback);
                     return;
                 } else {
                     // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
-                    camelContext.getReactiveExecutor().schedule(this);
+                    reactiveExecutor.schedule(this);
                 }
             });
         }
@@ -851,7 +854,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
                         EventHelper.notifyExchangeFailureHandled(exchange.getContext(), exchange, processor, deadLetterChannel, deadLetterUri);
                     } finally {
                         // if the fault was handled asynchronously, this should be reflected in the callback as well
-                        camelContext.getReactiveExecutor().schedule(callback);
+                        reactiveExecutor.schedule(callback);
                     }
                 });
             } else {
@@ -870,7 +873,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
                     prepareExchangeAfterFailure(exchange, isDeadLetterChannel, shouldHandle, shouldContinue);
                 } finally {
                     // callback we are done
-                    camelContext.getReactiveExecutor().schedule(callback);
+                    reactiveExecutor.schedule(callback);
                 }
             }