You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/04/08 12:09:15 UTC
[camel] 02/02: CAMEL-16455: Optimize CircuitBreaker EIP with task
pooling
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit e8bbb704187ba031cf82a2e329a68b6a3bc7e64b
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Apr 8 12:01:35 2021 +0200
CAMEL-16455: Optimize CircuitBreaker EIP with task pooling
---
.../resilience4j/ResilienceProcessor.java | 58 +++++++++++++++-------
1 file changed, 40 insertions(+), 18 deletions(-)
diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
index 864bd0f..05377ba 100644
--- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
+++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
@@ -87,6 +87,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport
private ExecutorService executorService;
private ProcessorExchangeFactory processorExchangeFactory;
private PooledExchangeTaskFactory taskFactory;
+ private PooledExchangeTaskFactory fallbackTaskFactory;
public ResilienceProcessor(CircuitBreakerConfig circuitBreakerConfig, BulkheadConfig bulkheadConfig,
TimeLimiterConfig timeLimiterConfig, Processor processor,
@@ -111,14 +112,21 @@ public class ResilienceProcessor extends AsyncProcessorSupport
boolean pooled = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled();
if (pooled) {
+ int capacity = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().getCapacity();
taskFactory = new PooledTaskFactory(getId()) {
@Override
public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) {
return new CircuitBreakerTask();
}
};
- int capacity = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().getCapacity();
taskFactory.setCapacity(capacity);
+ fallbackTaskFactory = new PooledTaskFactory(getId()) {
+ @Override
+ public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) {
+ return new CircuitBreakerFallbackTask();
+ }
+ };
+ fallbackTaskFactory.setCapacity(capacity);
} else {
taskFactory = new PrototypeTaskFactory() {
@Override
@@ -126,8 +134,13 @@ public class ResilienceProcessor extends AsyncProcessorSupport
return new CircuitBreakerTask();
}
};
+ fallbackTaskFactory = new PrototypeTaskFactory() {
+ @Override
+ public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) {
+ return new CircuitBreakerFallbackTask();
+ }
+ };
}
- LOG.trace("Using TaskFactory: {}", taskFactory);
// create a per processor exchange factory
this.processorExchangeFactory = getCamelContext().adapt(ExtendedCamelContext.class)
@@ -135,7 +148,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport
this.processorExchangeFactory.setRouteId(getRouteId());
this.processorExchangeFactory.setId(getId());
- ServiceHelper.buildService(processorExchangeFactory, taskFactory, processor);
+ ServiceHelper.buildService(processorExchangeFactory, taskFactory, fallbackTaskFactory, processor);
}
@Override
@@ -144,7 +157,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport
circuitBreaker = CircuitBreaker.of(id, circuitBreakerConfig);
}
- ServiceHelper.startService(processorExchangeFactory, taskFactory, processor);
+ ServiceHelper.startService(processorExchangeFactory, taskFactory, fallbackTaskFactory, processor);
}
@Override
@@ -153,12 +166,12 @@ public class ResilienceProcessor extends AsyncProcessorSupport
getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
}
- ServiceHelper.stopService(processorExchangeFactory, taskFactory, processor);
+ ServiceHelper.stopService(processorExchangeFactory, taskFactory, fallbackTaskFactory, processor);
}
@Override
protected void doShutdown() throws Exception {
- ServiceHelper.stopAndShutdownServices(processorExchangeFactory, taskFactory, processor);
+ ServiceHelper.stopAndShutdownServices(processorExchangeFactory, taskFactory, fallbackTaskFactory, processor);
}
@Override
@@ -444,6 +457,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport
// Camel error handler
exchange.setProperty(ExchangePropertyKey.TRY_ROUTE_BLOCK, true);
+ CircuitBreakerFallbackTask fallbackTask = (CircuitBreakerFallbackTask) fallbackTaskFactory.acquire(exchange, callback);
CircuitBreakerTask task = (CircuitBreakerTask) taskFactory.acquire(exchange, callback);
Callable<Exchange> callable;
@@ -463,7 +477,6 @@ public class ResilienceProcessor extends AsyncProcessorSupport
}
callable = CircuitBreaker.decorateCallable(circuitBreaker, callable);
- Function<Throwable, Exchange> fallbackTask = new CircuitBreakerFallbackTask(this.id, this.fallback, exchange);
try {
if (LOG.isTraceEnabled()) {
LOG.trace("Processing exchange: {} using circuit breaker: {}", exchange.getExchangeId(), id);
@@ -473,6 +486,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport
exchange.setException(e);
} finally {
taskFactory.release(task);
+ fallbackTaskFactory.release(fallbackTask);
}
if (LOG.isTraceEnabled()) {
@@ -569,16 +583,24 @@ public class ResilienceProcessor extends AsyncProcessorSupport
}
}
- private static final class CircuitBreakerFallbackTask implements Function<Throwable, Exchange> {
+ private final class CircuitBreakerFallbackTask implements PooledExchangeTask, Function<Throwable, Exchange> {
- private final String id;
- private final Processor processor;
- private final Exchange exchange;
+ private Exchange exchange;
- private CircuitBreakerFallbackTask(String id, Processor processor, Exchange exchange) {
- this.id = id;
- this.processor = processor;
+ @Override
+ public void prepare(Exchange exchange, AsyncCallback callback) {
this.exchange = exchange;
+ // callback not in use
+ }
+
+ @Override
+ public void reset() {
+ this.exchange = null;
+ }
+
+ @Override
+ public void run() {
+ // not in use
}
@Override
@@ -588,7 +610,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport
id, throwable);
}
- if (processor == null) {
+ if (fallback == null) {
if (throwable instanceof TimeoutException) {
// the circuit breaker triggered a timeout (and there is no
// fallback) so lets mark the exchange as failed
@@ -646,10 +668,10 @@ public class ResilienceProcessor extends AsyncProcessorSupport
exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false);
// run the fallback processor
try {
- LOG.debug("Running fallback: {} with exchange: {}", processor, exchange);
+ LOG.debug("Running fallback: {} with exchange: {}", fallback, exchange);
// process the fallback until its fully done
- processor.process(exchange);
- LOG.debug("Running fallback: {} with exchange: {} done", processor, exchange);
+ fallback.process(exchange);
+ LOG.debug("Running fallback: {} with exchange: {} done", fallback, exchange);
} catch (Throwable e) {
exchange.setException(e);
}