You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/06/20 06:14:33 UTC

[camel] 03/18: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 3e324d4eae093c00ba390ece7c8d08a8e34bfb99
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Jun 12 13:01:12 2019 +0200

    camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
---
 .../org/apache/camel/spi/ReactiveExecutor.java     |  4 ++++
 .../engine/DefaultAsyncProcessorAwaitManager.java  |  3 +--
 .../camel/impl/engine/DefaultReactiveExecutor.java |  6 +++++
 .../camel/processor/CamelInternalProcessor.java    |  5 ++--
 .../org/apache/camel/processor/LoopProcessor.java  |  7 +++---
 .../apache/camel/processor/MulticastProcessor.java | 13 +++++------
 .../java/org/apache/camel/processor/Pipeline.java  |  9 ++++----
 .../processor/SharedCamelInternalProcessor.java    |  5 ++--
 .../org/apache/camel/processor/TryProcessor.java   |  5 ++--
 .../processor/aggregate/AggregateProcessor.java    |  3 +--
 .../errorhandler/RedeliveryErrorHandler.java       | 27 +++++++++++-----------
 .../loadbalancer/FailOverLoadBalancer.java         |  5 ++--
 .../processor/loadbalancer/TopicLoadBalancer.java  |  5 ++--
 13 files changed, 48 insertions(+), 49 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
index 8987bd3..4a21127 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.spi;
 
+import org.apache.camel.AsyncCallback;
+
 /**
  * SPI to plugin different reactive engines in the Camel routing engine.
  */
@@ -38,4 +40,6 @@ public interface ReactiveExecutor {
 
     boolean executeFromQueue();
 
+    void callback(AsyncCallback callback);
+
 }
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
index 3a2b41c..8087942 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
@@ -34,7 +34,6 @@ import org.apache.camel.StaticService;
 import org.apache.camel.spi.AsyncProcessorAwaitManager;
 import org.apache.camel.spi.ExchangeFormatter;
 import org.apache.camel.support.MessageHelper;
-import org.apache.camel.support.ReactiveHelper;
 import org.apache.camel.support.processor.DefaultExchangeFormatter;
 import org.apache.camel.support.service.ServiceSupport;
 
@@ -88,7 +87,7 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
             if (latch.getCount() <= 0) {
                 return;
             }
-        } while (ReactiveHelper.executeFromQueue());
+        } while (exchange.getContext().getReactiveExecutor().executeFromQueue());
         log.trace("Waiting for asynchronous callback before continuing for exchangeId: {} -> {}",
                 exchange.getExchangeId(), exchange);
         try {
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
index 6a9473b..d700e3c 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.impl.engine;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.support.ReactiveHelper;
 
@@ -58,4 +59,9 @@ public class DefaultReactiveExecutor implements ReactiveExecutor {
     public boolean executeFromQueue() {
         return ReactiveHelper.executeFromQueue();
     }
+
+    @Override
+    public void callback(AsyncCallback callback) {
+        ReactiveHelper.callback(callback);
+    }
 }
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
index 27faa0b..f039f27 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
@@ -48,7 +48,6 @@ import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.support.CamelContextHelper;
 import org.apache.camel.support.MessageHelper;
 import org.apache.camel.support.OrderedComparator;
-import org.apache.camel.support.ReactiveHelper;
 import org.apache.camel.support.UnitOfWorkHelper;
 import org.apache.camel.support.processor.DelegateAsyncProcessor;
 import org.apache.camel.util.StopWatch;
@@ -175,7 +174,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
                 // CAMEL END USER - DEBUG ME HERE +++ START +++
                 // ----------------------------------------------------------
                 // callback must be called
-                ReactiveHelper.callback(ocallback);
+                exchange.getContext().getReactiveExecutor().callback(ocallback);
                 // ----------------------------------------------------------
                 // CAMEL END USER - DEBUG ME HERE +++ END +++
                 // ----------------------------------------------------------
@@ -225,7 +224,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
             // CAMEL END USER - DEBUG ME HERE +++ END +++
             // ----------------------------------------------------------
 
-            ReactiveHelper.schedule(() -> {
+            exchange.getContext().getReactiveExecutor().schedule(() -> {
                 // execute any after processor work (in current thread, not in the callback)
                 if (uow != null) {
                     uow.afterProcess(processor, exchange, callback, false);
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java
index 75329bc..1628c85 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java
@@ -25,7 +25,6 @@ import org.apache.camel.Processor;
 import org.apache.camel.Traceable;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.support.ExchangeHelper;
-import org.apache.camel.support.ReactiveHelper;
 import org.apache.camel.support.processor.DelegateAsyncProcessor;
 
 import static org.apache.camel.processor.PipelineHelper.continueProcessing;
@@ -54,9 +53,9 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
             LoopState state = new LoopState(exchange, callback);
 
             if (exchange.isTransacted()) {
-                ReactiveHelper.scheduleSync(state);
+                exchange.getContext().getReactiveExecutor().scheduleSync(state);
             } else {
-                ReactiveHelper.scheduleMain(state);
+                exchange.getContext().getReactiveExecutor().scheduleMain(state);
             }
             return false;
 
@@ -113,7 +112,7 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
                     processor.process(current, doneSync -> {
                         // increment counter after done
                         index++;
-                        ReactiveHelper.schedule(this);
+                        exchange.getContext().getReactiveExecutor().schedule(this);
                     });
                 } else {
                     // we are done so prepare the result
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 594c936..4d0806f 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -56,7 +56,6 @@ import org.apache.camel.support.AsyncProcessorConverterHelper;
 import org.apache.camel.support.AsyncProcessorSupport;
 import org.apache.camel.support.EventHelper;
 import org.apache.camel.support.ExchangeHelper;
-import org.apache.camel.support.ReactiveHelper;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.IOHelper;
@@ -220,12 +219,12 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
 
         MulticastState state = new MulticastState(exchange, pairs, callback);
         if (isParallelProcessing()) {
-            executorService.submit(() -> ReactiveHelper.schedule(state));
+            executorService.submit(() -> exchange.getContext().getReactiveExecutor().schedule(state));
         } else {
             if (exchange.isTransacted()) {
-                ReactiveHelper.scheduleSync(state);
+                exchange.getContext().getReactiveExecutor().scheduleSync(state);
             } else {
-                ReactiveHelper.scheduleMain(state);
+                exchange.getContext().getReactiveExecutor().scheduleMain(state);
             }
         }
 
@@ -237,9 +236,9 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
 
     protected void schedule(Runnable runnable) {
         if (isParallelProcessing()) {
-            executorService.submit(() -> ReactiveHelper.schedule(runnable));
+            executorService.submit(() -> camelContext.getReactiveExecutor().schedule(runnable));
         } else {
-            ReactiveHelper.schedule(runnable, "Multicast next step");
+            camelContext.getReactiveExecutor().schedule(runnable, "Multicast next step");
         }
     }
 
@@ -524,7 +523,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
             original.setProperty(Exchange.REDELIVERY_EXHAUSTED, exhaust);
         }
 
-        ReactiveHelper.callback(callback);
+        camelContext.getReactiveExecutor().callback(callback);
     }
 
     /**
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
index 077c8de..9ffe248 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
@@ -33,7 +33,6 @@ import org.apache.camel.spi.IdAware;
 import org.apache.camel.support.AsyncProcessorConverterHelper;
 import org.apache.camel.support.AsyncProcessorSupport;
 import org.apache.camel.support.ExchangeHelper;
-import org.apache.camel.support.ReactiveHelper;
 import org.apache.camel.support.service.ServiceHelper;
 
 import static org.apache.camel.processor.PipelineHelper.continueProcessing;
@@ -82,10 +81,10 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
         if (exchange.isTransacted()) {
-            ReactiveHelper.scheduleSync(() -> Pipeline.this.doProcess(exchange, callback, processors.iterator(), true),
+            camelContext.getReactiveExecutor().scheduleSync(() -> Pipeline.this.doProcess(exchange, callback, processors.iterator(), true),
                     "Step[" + exchange.getExchangeId() + "," + Pipeline.this + "]");
         } else {
-            ReactiveHelper.scheduleMain(() -> Pipeline.this.doProcess(exchange, callback, processors.iterator(), true),
+            camelContext.getReactiveExecutor().scheduleMain(() -> Pipeline.this.doProcess(exchange, callback, processors.iterator(), true),
                     "Step[" + exchange.getExchangeId() + "," + Pipeline.this + "]");
         }
         return false;
@@ -105,7 +104,7 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo
             AsyncProcessor processor = processors.next();
 
             processor.process(exchange, doneSync ->
-                    ReactiveHelper.schedule(() -> doProcess(exchange, callback, processors, false),
+                    camelContext.getReactiveExecutor().schedule(() -> doProcess(exchange, callback, processors, false),
                             "Step[" + exchange.getExchangeId() + "," + Pipeline.this + "]"));
         } else {
             ExchangeHelper.copyResults(exchange, exchange);
@@ -115,7 +114,7 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo
             // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots
             log.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
 
-            ReactiveHelper.callback(callback);
+            camelContext.getReactiveExecutor().callback(callback);
         }
     }
 
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
index 0cf2654..1a5e92d 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
@@ -36,7 +36,6 @@ import org.apache.camel.spi.Transformer;
 import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.support.AsyncCallbackToCompletableFutureAdapter;
 import org.apache.camel.support.OrderedComparator;
-import org.apache.camel.support.ReactiveHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -192,7 +191,7 @@ public class SharedCamelInternalProcessor {
             // CAMEL END USER - DEBUG ME HERE +++ END +++
             // ----------------------------------------------------------
 
-            ReactiveHelper.schedule(() -> {
+            exchange.getContext().getReactiveExecutor().schedule(() -> {
                 // execute any after processor work (in current thread, not in the callback)
                 if (uow != null) {
                     uow.afterProcess(processor, exchange, callback, sync);
@@ -255,7 +254,7 @@ public class SharedCamelInternalProcessor {
                 // CAMEL END USER - DEBUG ME HERE +++ START +++
                 // ----------------------------------------------------------
                 // callback must be called
-                ReactiveHelper.callback(callback);
+                exchange.getContext().getReactiveExecutor().callback(callback);
                 // ----------------------------------------------------------
                 // CAMEL END USER - DEBUG ME HERE +++ END +++
                 // ----------------------------------------------------------
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java
index 264ba2b..9b1d50d 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java
@@ -30,7 +30,6 @@ import org.apache.camel.spi.IdAware;
 import org.apache.camel.support.AsyncProcessorConverterHelper;
 import org.apache.camel.support.AsyncProcessorSupport;
 import org.apache.camel.support.ExchangeHelper;
-import org.apache.camel.support.ReactiveHelper;
 import org.apache.camel.support.service.ServiceHelper;
 
 /**
@@ -61,7 +60,7 @@ public class TryProcessor extends AsyncProcessorSupport implements Navigate<Proc
 
     public boolean process(Exchange exchange, AsyncCallback callback) {
 
-        ReactiveHelper.schedule(new TryState(exchange, callback));
+        exchange.getContext().getReactiveExecutor().schedule(new TryState(exchange, callback));
         return false;
     }
 
@@ -90,7 +89,7 @@ public class TryProcessor extends AsyncProcessorSupport implements Navigate<Proc
                 Processor processor = processors.next();
                 AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor);
                 log.trace("Processing exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
-                async.process(exchange, doneSync -> ReactiveHelper.schedule(this));
+                async.process(exchange, doneSync -> exchange.getContext().getReactiveExecutor().schedule(this));
             } else {
                 ExchangeHelper.prepareOutToIn(exchange);
                 exchange.removeProperty(Exchange.TRY_ROUTE_BLOCK);
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 2ccde2e..f4508dd 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -64,7 +64,6 @@ import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.LRUCacheFactory;
 import org.apache.camel.support.LoggingExceptionHandler;
 import org.apache.camel.support.NoLock;
-import org.apache.camel.support.ReactiveHelper;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.StopWatch;
@@ -770,7 +769,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
 
         // send this exchange
         // the call to schedule last if needed to ensure in-order processing of the aggregates
-        executorService.submit(() -> ReactiveHelper.scheduleSync(() -> processor.process(exchange, done -> {
+        executorService.submit(() -> camelContext.getReactiveExecutor().scheduleSync(() -> processor.process(exchange, done -> {
             // log exception if there was a problem
             if (exchange.getException() != null) {
                 // if there was an exception then let the exception handler handle it
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
index 782d8a7..2628c67 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
@@ -48,7 +48,6 @@ import org.apache.camel.support.CamelContextHelper;
 import org.apache.camel.support.EventHelper;
 import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.MessageHelper;
-import org.apache.camel.support.ReactiveHelper;
 import org.apache.camel.support.processor.DefaultExchangeFormatter;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.ObjectHelper;
@@ -153,9 +152,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
         RedeliveryState state = new RedeliveryState(exchange, callback);
         // Run it
         if (exchange.isTransacted()) {
-            ReactiveHelper.scheduleSync(state);
+            camelContext.getReactiveExecutor().scheduleSync(state);
         } else {
-            ReactiveHelper.scheduleMain(state);
+            camelContext.getReactiveExecutor().scheduleMain(state);
         }
         return false;
     }
@@ -442,7 +441,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
                         if (log.isTraceEnabled()) {
                             log.trace("Scheduling redelivery task to run in {} millis for exchangeId: {}", redeliveryDelay, exchange.getExchangeId());
                         }
-                        executorService.schedule(() -> ReactiveHelper.schedule(this::redeliver), redeliveryDelay, TimeUnit.MILLISECONDS);
+                        executorService.schedule(() -> camelContext.getReactiveExecutor().schedule(this::redeliver), redeliveryDelay, TimeUnit.MILLISECONDS);
 
                     } else {
                         // async delayed redelivery was disabled or we are transacted so we must be synchronous
@@ -458,9 +457,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
                                 // mark the exchange as redelivery exhausted so the failure processor / dead letter channel can process the exchange
                                 exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
                                 // jump to start of loop which then detects that we are failed and exhausted
-                                ReactiveHelper.schedule(this);
+                                camelContext.getReactiveExecutor().schedule(this);
                             } else {
-                                ReactiveHelper.schedule(this::redeliver);
+                                camelContext.getReactiveExecutor().schedule(this::redeliver);
                             }
                         } catch (InterruptedException e) {
                             redeliverySleepCounter.decrementAndGet();
@@ -469,12 +468,12 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
                             // mark the exchange to stop continue routing when interrupted
                             // as we do not want to continue routing (for example a task has been cancelled)
                             exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE);
-                            ReactiveHelper.callback(callback);
+                            camelContext.getReactiveExecutor().callback(callback);
                         }
                     }
                 } else {
                     // execute the task immediately
-                    ReactiveHelper.schedule(this::redeliver);
+                    camelContext.getReactiveExecutor().schedule(this::redeliver);
                 }
             } else {
                 // Simple delivery
@@ -482,10 +481,10 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
                     // only process if the exchange hasn't failed
                     // and it has not been handled by the error processor
                     if (isDone(exchange)) {
-                        ReactiveHelper.callback(callback);
+                        camelContext.getReactiveExecutor().callback(callback);
                     } else {
                         // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
-                        ReactiveHelper.schedule(this);
+                        camelContext.getReactiveExecutor().schedule(this);
                     }
                 });
             }
@@ -563,11 +562,11 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
                 // only process if the exchange hasn't failed
                 // and it has not been handled by the error processor
                 if (isDone(exchange)) {
-                    ReactiveHelper.callback(callback);
+                    camelContext.getReactiveExecutor().callback(callback);
                     return;
                 } else {
                     // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
-                    ReactiveHelper.schedule(this);
+                    camelContext.getReactiveExecutor().schedule(this);
                 }
             });
         }
@@ -845,7 +844,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
                         EventHelper.notifyExchangeFailureHandled(exchange.getContext(), exchange, processor, deadLetterChannel, deadLetterUri);
                     } finally {
                         // if the fault was handled asynchronously, this should be reflected in the callback as well
-                        ReactiveHelper.callback(callback);
+                        camelContext.getReactiveExecutor().callback(callback);
                     }
                 });
             } else {
@@ -864,7 +863,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
                     prepareExchangeAfterFailure(exchange, isDeadLetterChannel, shouldHandle, shouldContinue);
                 } finally {
                     // callback we are done
-                    ReactiveHelper.callback(callback);
+                    camelContext.getReactiveExecutor().callback(callback);
                 }
             }
 
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java b/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
index 059c795..89a6f91 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
@@ -27,7 +27,6 @@ import org.apache.camel.CamelContextAware;
 import org.apache.camel.Exchange;
 import org.apache.camel.Traceable;
 import org.apache.camel.support.ExchangeHelper;
-import org.apache.camel.support.ReactiveHelper;
 import org.apache.camel.util.ObjectHelper;
 
 /**
@@ -159,7 +158,7 @@ public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceab
 
     public boolean process(final Exchange exchange, final AsyncCallback callback) {
         AsyncProcessor[] processors = doGetProcessors();
-        ReactiveHelper.schedule(new State(exchange, callback, processors)::run);
+        exchange.getContext().getReactiveExecutor().schedule(new State(exchange, callback, processors)::run);
         return false;
     }
 
@@ -246,7 +245,7 @@ public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceab
 
             // process the exchange
             log.debug("Processing failover at attempt {} for {}", attempts, copy);
-            processor.process(copy, doneSync -> ReactiveHelper.schedule(this::run));
+            processor.process(copy, doneSync -> exchange.getContext().getReactiveExecutor().schedule(this::run));
         }
 
     }
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java b/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
index 09ba098..ebd0f87 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
@@ -20,7 +20,6 @@ import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.support.ReactiveHelper;
 
 /**
  * A {@link LoadBalancer} implementations which sends to all destinations
@@ -33,7 +32,7 @@ public class TopicLoadBalancer extends LoadBalancerSupport {
 
     public boolean process(final Exchange exchange, final AsyncCallback callback) {
         AsyncProcessor[] processors = doGetProcessors();
-        ReactiveHelper.schedule(new State(exchange, callback, processors)::run);
+        exchange.getContext().getReactiveExecutor().schedule(new State(exchange, callback, processors)::run);
         return false;
     }
 
@@ -64,7 +63,7 @@ public class TopicLoadBalancer extends LoadBalancerSupport {
                 exchange.setException(current.getException());
                 callback.done(false);
             } else {
-                ReactiveHelper.schedule(this::run);
+                exchange.getContext().getReactiveExecutor().schedule(this::run);
             }
         }
     }