You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/06/20 06:14:33 UTC
[camel] 03/18: camel3 - SPI for ReactiveHelper so we can plugin
different reactive engines
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 3e324d4eae093c00ba390ece7c8d08a8e34bfb99
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Jun 12 13:01:12 2019 +0200
camel3 - SPI for ReactiveHelper so we can plugin different reactive engines
---
.../org/apache/camel/spi/ReactiveExecutor.java | 4 ++++
.../engine/DefaultAsyncProcessorAwaitManager.java | 3 +--
.../camel/impl/engine/DefaultReactiveExecutor.java | 6 +++++
.../camel/processor/CamelInternalProcessor.java | 5 ++--
.../org/apache/camel/processor/LoopProcessor.java | 7 +++---
.../apache/camel/processor/MulticastProcessor.java | 13 +++++------
.../java/org/apache/camel/processor/Pipeline.java | 9 ++++----
.../processor/SharedCamelInternalProcessor.java | 5 ++--
.../org/apache/camel/processor/TryProcessor.java | 5 ++--
.../processor/aggregate/AggregateProcessor.java | 3 +--
.../errorhandler/RedeliveryErrorHandler.java | 27 +++++++++++-----------
.../loadbalancer/FailOverLoadBalancer.java | 5 ++--
.../processor/loadbalancer/TopicLoadBalancer.java | 5 ++--
13 files changed, 48 insertions(+), 49 deletions(-)
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
index 8987bd3..4a21127 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.spi;
+import org.apache.camel.AsyncCallback;
+
/**
* SPI to plugin different reactive engines in the Camel routing engine.
*/
@@ -38,4 +40,6 @@ public interface ReactiveExecutor {
boolean executeFromQueue();
+ void callback(AsyncCallback callback);
+
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
index 3a2b41c..8087942 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
@@ -34,7 +34,6 @@ import org.apache.camel.StaticService;
import org.apache.camel.spi.AsyncProcessorAwaitManager;
import org.apache.camel.spi.ExchangeFormatter;
import org.apache.camel.support.MessageHelper;
-import org.apache.camel.support.ReactiveHelper;
import org.apache.camel.support.processor.DefaultExchangeFormatter;
import org.apache.camel.support.service.ServiceSupport;
@@ -88,7 +87,7 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
if (latch.getCount() <= 0) {
return;
}
- } while (ReactiveHelper.executeFromQueue());
+ } while (exchange.getContext().getReactiveExecutor().executeFromQueue());
log.trace("Waiting for asynchronous callback before continuing for exchangeId: {} -> {}",
exchange.getExchangeId(), exchange);
try {
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
index 6a9473b..d700e3c 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.impl.engine;
+import org.apache.camel.AsyncCallback;
import org.apache.camel.spi.ReactiveExecutor;
import org.apache.camel.support.ReactiveHelper;
@@ -58,4 +59,9 @@ public class DefaultReactiveExecutor implements ReactiveExecutor {
public boolean executeFromQueue() {
return ReactiveHelper.executeFromQueue();
}
+
+ @Override
+ public void callback(AsyncCallback callback) {
+ ReactiveHelper.callback(callback);
+ }
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
index 27faa0b..f039f27 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
@@ -48,7 +48,6 @@ import org.apache.camel.spi.UnitOfWork;
import org.apache.camel.support.CamelContextHelper;
import org.apache.camel.support.MessageHelper;
import org.apache.camel.support.OrderedComparator;
-import org.apache.camel.support.ReactiveHelper;
import org.apache.camel.support.UnitOfWorkHelper;
import org.apache.camel.support.processor.DelegateAsyncProcessor;
import org.apache.camel.util.StopWatch;
@@ -175,7 +174,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
// CAMEL END USER - DEBUG ME HERE +++ START +++
// ----------------------------------------------------------
// callback must be called
- ReactiveHelper.callback(ocallback);
+ exchange.getContext().getReactiveExecutor().callback(ocallback);
// ----------------------------------------------------------
// CAMEL END USER - DEBUG ME HERE +++ END +++
// ----------------------------------------------------------
@@ -225,7 +224,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
// CAMEL END USER - DEBUG ME HERE +++ END +++
// ----------------------------------------------------------
- ReactiveHelper.schedule(() -> {
+ exchange.getContext().getReactiveExecutor().schedule(() -> {
// execute any after processor work (in current thread, not in the callback)
if (uow != null) {
uow.afterProcess(processor, exchange, callback, false);
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java
index 75329bc..1628c85 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java
@@ -25,7 +25,6 @@ import org.apache.camel.Processor;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
import org.apache.camel.support.ExchangeHelper;
-import org.apache.camel.support.ReactiveHelper;
import org.apache.camel.support.processor.DelegateAsyncProcessor;
import static org.apache.camel.processor.PipelineHelper.continueProcessing;
@@ -54,9 +53,9 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
LoopState state = new LoopState(exchange, callback);
if (exchange.isTransacted()) {
- ReactiveHelper.scheduleSync(state);
+ exchange.getContext().getReactiveExecutor().scheduleSync(state);
} else {
- ReactiveHelper.scheduleMain(state);
+ exchange.getContext().getReactiveExecutor().scheduleMain(state);
}
return false;
@@ -113,7 +112,7 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
processor.process(current, doneSync -> {
// increment counter after done
index++;
- ReactiveHelper.schedule(this);
+ exchange.getContext().getReactiveExecutor().schedule(this);
});
} else {
// we are done so prepare the result
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 594c936..4d0806f 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -56,7 +56,6 @@ import org.apache.camel.support.AsyncProcessorConverterHelper;
import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.EventHelper;
import org.apache.camel.support.ExchangeHelper;
-import org.apache.camel.support.ReactiveHelper;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.IOHelper;
@@ -220,12 +219,12 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
MulticastState state = new MulticastState(exchange, pairs, callback);
if (isParallelProcessing()) {
- executorService.submit(() -> ReactiveHelper.schedule(state));
+ executorService.submit(() -> exchange.getContext().getReactiveExecutor().schedule(state));
} else {
if (exchange.isTransacted()) {
- ReactiveHelper.scheduleSync(state);
+ exchange.getContext().getReactiveExecutor().scheduleSync(state);
} else {
- ReactiveHelper.scheduleMain(state);
+ exchange.getContext().getReactiveExecutor().scheduleMain(state);
}
}
@@ -237,9 +236,9 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
protected void schedule(Runnable runnable) {
if (isParallelProcessing()) {
- executorService.submit(() -> ReactiveHelper.schedule(runnable));
+ executorService.submit(() -> camelContext.getReactiveExecutor().schedule(runnable));
} else {
- ReactiveHelper.schedule(runnable, "Multicast next step");
+ camelContext.getReactiveExecutor().schedule(runnable, "Multicast next step");
}
}
@@ -524,7 +523,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
original.setProperty(Exchange.REDELIVERY_EXHAUSTED, exhaust);
}
- ReactiveHelper.callback(callback);
+ camelContext.getReactiveExecutor().callback(callback);
}
/**
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
index 077c8de..9ffe248 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
@@ -33,7 +33,6 @@ import org.apache.camel.spi.IdAware;
import org.apache.camel.support.AsyncProcessorConverterHelper;
import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.ExchangeHelper;
-import org.apache.camel.support.ReactiveHelper;
import org.apache.camel.support.service.ServiceHelper;
import static org.apache.camel.processor.PipelineHelper.continueProcessing;
@@ -82,10 +81,10 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
if (exchange.isTransacted()) {
- ReactiveHelper.scheduleSync(() -> Pipeline.this.doProcess(exchange, callback, processors.iterator(), true),
+ camelContext.getReactiveExecutor().scheduleSync(() -> Pipeline.this.doProcess(exchange, callback, processors.iterator(), true),
"Step[" + exchange.getExchangeId() + "," + Pipeline.this + "]");
} else {
- ReactiveHelper.scheduleMain(() -> Pipeline.this.doProcess(exchange, callback, processors.iterator(), true),
+ camelContext.getReactiveExecutor().scheduleMain(() -> Pipeline.this.doProcess(exchange, callback, processors.iterator(), true),
"Step[" + exchange.getExchangeId() + "," + Pipeline.this + "]");
}
return false;
@@ -105,7 +104,7 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo
AsyncProcessor processor = processors.next();
processor.process(exchange, doneSync ->
- ReactiveHelper.schedule(() -> doProcess(exchange, callback, processors, false),
+ camelContext.getReactiveExecutor().schedule(() -> doProcess(exchange, callback, processors, false),
"Step[" + exchange.getExchangeId() + "," + Pipeline.this + "]"));
} else {
ExchangeHelper.copyResults(exchange, exchange);
@@ -115,7 +114,7 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo
// we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots
log.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
- ReactiveHelper.callback(callback);
+ camelContext.getReactiveExecutor().callback(callback);
}
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
index 0cf2654..1a5e92d 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
@@ -36,7 +36,6 @@ import org.apache.camel.spi.Transformer;
import org.apache.camel.spi.UnitOfWork;
import org.apache.camel.support.AsyncCallbackToCompletableFutureAdapter;
import org.apache.camel.support.OrderedComparator;
-import org.apache.camel.support.ReactiveHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -192,7 +191,7 @@ public class SharedCamelInternalProcessor {
// CAMEL END USER - DEBUG ME HERE +++ END +++
// ----------------------------------------------------------
- ReactiveHelper.schedule(() -> {
+ exchange.getContext().getReactiveExecutor().schedule(() -> {
// execute any after processor work (in current thread, not in the callback)
if (uow != null) {
uow.afterProcess(processor, exchange, callback, sync);
@@ -255,7 +254,7 @@ public class SharedCamelInternalProcessor {
// CAMEL END USER - DEBUG ME HERE +++ START +++
// ----------------------------------------------------------
// callback must be called
- ReactiveHelper.callback(callback);
+ exchange.getContext().getReactiveExecutor().callback(callback);
// ----------------------------------------------------------
// CAMEL END USER - DEBUG ME HERE +++ END +++
// ----------------------------------------------------------
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java
index 264ba2b..9b1d50d 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java
@@ -30,7 +30,6 @@ import org.apache.camel.spi.IdAware;
import org.apache.camel.support.AsyncProcessorConverterHelper;
import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.ExchangeHelper;
-import org.apache.camel.support.ReactiveHelper;
import org.apache.camel.support.service.ServiceHelper;
/**
@@ -61,7 +60,7 @@ public class TryProcessor extends AsyncProcessorSupport implements Navigate<Proc
public boolean process(Exchange exchange, AsyncCallback callback) {
- ReactiveHelper.schedule(new TryState(exchange, callback));
+ exchange.getContext().getReactiveExecutor().schedule(new TryState(exchange, callback));
return false;
}
@@ -90,7 +89,7 @@ public class TryProcessor extends AsyncProcessorSupport implements Navigate<Proc
Processor processor = processors.next();
AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor);
log.trace("Processing exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
- async.process(exchange, doneSync -> ReactiveHelper.schedule(this));
+ async.process(exchange, doneSync -> exchange.getContext().getReactiveExecutor().schedule(this));
} else {
ExchangeHelper.prepareOutToIn(exchange);
exchange.removeProperty(Exchange.TRY_ROUTE_BLOCK);
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 2ccde2e..f4508dd 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -64,7 +64,6 @@ import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.LRUCacheFactory;
import org.apache.camel.support.LoggingExceptionHandler;
import org.apache.camel.support.NoLock;
-import org.apache.camel.support.ReactiveHelper;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.StopWatch;
@@ -770,7 +769,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
// send this exchange
// the call to schedule last if needed to ensure in-order processing of the aggregates
- executorService.submit(() -> ReactiveHelper.scheduleSync(() -> processor.process(exchange, done -> {
+ executorService.submit(() -> camelContext.getReactiveExecutor().scheduleSync(() -> processor.process(exchange, done -> {
// log exception if there was a problem
if (exchange.getException() != null) {
// if there was an exception then let the exception handler handle it
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
index 782d8a7..2628c67 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
@@ -48,7 +48,6 @@ import org.apache.camel.support.CamelContextHelper;
import org.apache.camel.support.EventHelper;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.MessageHelper;
-import org.apache.camel.support.ReactiveHelper;
import org.apache.camel.support.processor.DefaultExchangeFormatter;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.util.ObjectHelper;
@@ -153,9 +152,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
RedeliveryState state = new RedeliveryState(exchange, callback);
// Run it
if (exchange.isTransacted()) {
- ReactiveHelper.scheduleSync(state);
+ camelContext.getReactiveExecutor().scheduleSync(state);
} else {
- ReactiveHelper.scheduleMain(state);
+ camelContext.getReactiveExecutor().scheduleMain(state);
}
return false;
}
@@ -442,7 +441,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
if (log.isTraceEnabled()) {
log.trace("Scheduling redelivery task to run in {} millis for exchangeId: {}", redeliveryDelay, exchange.getExchangeId());
}
- executorService.schedule(() -> ReactiveHelper.schedule(this::redeliver), redeliveryDelay, TimeUnit.MILLISECONDS);
+ executorService.schedule(() -> camelContext.getReactiveExecutor().schedule(this::redeliver), redeliveryDelay, TimeUnit.MILLISECONDS);
} else {
// async delayed redelivery was disabled or we are transacted so we must be synchronous
@@ -458,9 +457,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
// mark the exchange as redelivery exhausted so the failure processor / dead letter channel can process the exchange
exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
// jump to start of loop which then detects that we are failed and exhausted
- ReactiveHelper.schedule(this);
+ camelContext.getReactiveExecutor().schedule(this);
} else {
- ReactiveHelper.schedule(this::redeliver);
+ camelContext.getReactiveExecutor().schedule(this::redeliver);
}
} catch (InterruptedException e) {
redeliverySleepCounter.decrementAndGet();
@@ -469,12 +468,12 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
// mark the exchange to stop continue routing when interrupted
// as we do not want to continue routing (for example a task has been cancelled)
exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE);
- ReactiveHelper.callback(callback);
+ camelContext.getReactiveExecutor().callback(callback);
}
}
} else {
// execute the task immediately
- ReactiveHelper.schedule(this::redeliver);
+ camelContext.getReactiveExecutor().schedule(this::redeliver);
}
} else {
// Simple delivery
@@ -482,10 +481,10 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
// only process if the exchange hasn't failed
// and it has not been handled by the error processor
if (isDone(exchange)) {
- ReactiveHelper.callback(callback);
+ camelContext.getReactiveExecutor().callback(callback);
} else {
// error occurred so loop back around which we do by invoking the processAsyncErrorHandler
- ReactiveHelper.schedule(this);
+ camelContext.getReactiveExecutor().schedule(this);
}
});
}
@@ -563,11 +562,11 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
// only process if the exchange hasn't failed
// and it has not been handled by the error processor
if (isDone(exchange)) {
- ReactiveHelper.callback(callback);
+ camelContext.getReactiveExecutor().callback(callback);
return;
} else {
// error occurred so loop back around which we do by invoking the processAsyncErrorHandler
- ReactiveHelper.schedule(this);
+ camelContext.getReactiveExecutor().schedule(this);
}
});
}
@@ -845,7 +844,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
EventHelper.notifyExchangeFailureHandled(exchange.getContext(), exchange, processor, deadLetterChannel, deadLetterUri);
} finally {
// if the fault was handled asynchronously, this should be reflected in the callback as well
- ReactiveHelper.callback(callback);
+ camelContext.getReactiveExecutor().callback(callback);
}
});
} else {
@@ -864,7 +863,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
prepareExchangeAfterFailure(exchange, isDeadLetterChannel, shouldHandle, shouldContinue);
} finally {
// callback we are done
- ReactiveHelper.callback(callback);
+ camelContext.getReactiveExecutor().callback(callback);
}
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java b/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
index 059c795..89a6f91 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
@@ -27,7 +27,6 @@ import org.apache.camel.CamelContextAware;
import org.apache.camel.Exchange;
import org.apache.camel.Traceable;
import org.apache.camel.support.ExchangeHelper;
-import org.apache.camel.support.ReactiveHelper;
import org.apache.camel.util.ObjectHelper;
/**
@@ -159,7 +158,7 @@ public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceab
public boolean process(final Exchange exchange, final AsyncCallback callback) {
AsyncProcessor[] processors = doGetProcessors();
- ReactiveHelper.schedule(new State(exchange, callback, processors)::run);
+ exchange.getContext().getReactiveExecutor().schedule(new State(exchange, callback, processors)::run);
return false;
}
@@ -246,7 +245,7 @@ public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceab
// process the exchange
log.debug("Processing failover at attempt {} for {}", attempts, copy);
- processor.process(copy, doneSync -> ReactiveHelper.schedule(this::run));
+ processor.process(copy, doneSync -> exchange.getContext().getReactiveExecutor().schedule(this::run));
}
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java b/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
index 09ba098..ebd0f87 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
@@ -20,7 +20,6 @@ import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
-import org.apache.camel.support.ReactiveHelper;
/**
* A {@link LoadBalancer} implementations which sends to all destinations
@@ -33,7 +32,7 @@ public class TopicLoadBalancer extends LoadBalancerSupport {
public boolean process(final Exchange exchange, final AsyncCallback callback) {
AsyncProcessor[] processors = doGetProcessors();
- ReactiveHelper.schedule(new State(exchange, callback, processors)::run);
+ exchange.getContext().getReactiveExecutor().schedule(new State(exchange, callback, processors)::run);
return false;
}
@@ -64,7 +63,7 @@ public class TopicLoadBalancer extends LoadBalancerSupport {
exchange.setException(current.getException());
callback.done(false);
} else {
- ReactiveHelper.schedule(this::run);
+ exchange.getContext().getReactiveExecutor().schedule(this::run);
}
}
}