You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/04/08 12:09:14 UTC
[camel] 01/02: CAMEL-16455: Optimize CircuitBreaker EIP with task
pooling
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 71f6737d342c901471154900754a3ed7853ca038
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Apr 8 11:42:32 2021 +0200
CAMEL-16455: Optimize CircuitBreaker EIP with task pooling
---
.../resilience4j/ResilienceProcessor.java | 86 +++++++++++++++++-----
1 file changed, 68 insertions(+), 18 deletions(-)
diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
index 57f6f49..864bd0f 100644
--- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
+++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
@@ -47,6 +47,10 @@ import org.apache.camel.RuntimeExchangeException;
import org.apache.camel.api.management.ManagedAttribute;
import org.apache.camel.api.management.ManagedOperation;
import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.processor.PooledExchangeTask;
+import org.apache.camel.processor.PooledExchangeTaskFactory;
+import org.apache.camel.processor.PooledTaskFactory;
+import org.apache.camel.processor.PrototypeTaskFactory;
import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.ProcessorExchangeFactory;
import org.apache.camel.spi.RouteIdAware;
@@ -82,6 +86,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport
private boolean shutdownExecutorService;
private ExecutorService executorService;
private ProcessorExchangeFactory processorExchangeFactory;
+ private PooledExchangeTaskFactory taskFactory;
public ResilienceProcessor(CircuitBreakerConfig circuitBreakerConfig, BulkheadConfig bulkheadConfig,
TimeLimiterConfig timeLimiterConfig, Processor processor,
@@ -104,13 +109,33 @@ public class ResilienceProcessor extends AsyncProcessorSupport
bulkhead = Bulkhead.of(id, bulkheadConfig);
}
+ boolean pooled = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled();
+ if (pooled) {
+ taskFactory = new PooledTaskFactory(getId()) {
+ @Override
+ public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) {
+ return new CircuitBreakerTask();
+ }
+ };
+ int capacity = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().getCapacity();
+ taskFactory.setCapacity(capacity);
+ } else {
+ taskFactory = new PrototypeTaskFactory() {
+ @Override
+ public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) {
+ return new CircuitBreakerTask();
+ }
+ };
+ }
+ LOG.trace("Using TaskFactory: {}", taskFactory);
+
// create a per processor exchange factory
this.processorExchangeFactory = getCamelContext().adapt(ExtendedCamelContext.class)
.getProcessorExchangeFactory().newProcessorExchangeFactory(this);
this.processorExchangeFactory.setRouteId(getRouteId());
this.processorExchangeFactory.setId(getId());
- ServiceHelper.buildService(processorExchangeFactory, processor);
+ ServiceHelper.buildService(processorExchangeFactory, taskFactory, processor);
}
@Override
@@ -119,7 +144,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport
circuitBreaker = CircuitBreaker.of(id, circuitBreakerConfig);
}
- ServiceHelper.startService(processorExchangeFactory, processor);
+ ServiceHelper.startService(processorExchangeFactory, taskFactory, processor);
}
@Override
@@ -128,12 +153,12 @@ public class ResilienceProcessor extends AsyncProcessorSupport
getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
}
- ServiceHelper.stopService(processorExchangeFactory, processor);
+ ServiceHelper.stopService(processorExchangeFactory, taskFactory, processor);
}
@Override
protected void doShutdown() throws Exception {
- ServiceHelper.stopAndShutdownServices(processorExchangeFactory, processor);
+ ServiceHelper.stopAndShutdownServices(processorExchangeFactory, taskFactory, processor);
}
@Override
@@ -419,39 +444,43 @@ public class ResilienceProcessor extends AsyncProcessorSupport
// Camel error handler
exchange.setProperty(ExchangePropertyKey.TRY_ROUTE_BLOCK, true);
- Callable<Exchange> task;
+ CircuitBreakerTask task = (CircuitBreakerTask) taskFactory.acquire(exchange, callback);
+ Callable<Exchange> callable;
if (timeLimiter != null) {
Supplier<CompletableFuture<Exchange>> futureSupplier;
if (executorService == null) {
- futureSupplier = () -> CompletableFuture.supplyAsync(() -> processTask(exchange));
+ futureSupplier = () -> CompletableFuture.supplyAsync(task);
} else {
- futureSupplier = () -> CompletableFuture.supplyAsync(() -> processTask(exchange), executorService);
+ futureSupplier = () -> CompletableFuture.supplyAsync(task, executorService);
}
- task = TimeLimiter.decorateFutureSupplier(timeLimiter, futureSupplier);
+ callable = TimeLimiter.decorateFutureSupplier(timeLimiter, futureSupplier);
} else {
- task = new CircuitBreakerTask(() -> processTask(exchange));
+ callable = task;
}
-
if (bulkhead != null) {
- task = Bulkhead.decorateCallable(bulkhead, task);
+ callable = Bulkhead.decorateCallable(bulkhead, task);
}
- task = CircuitBreaker.decorateCallable(circuitBreaker, task);
+ callable = CircuitBreaker.decorateCallable(circuitBreaker, callable);
Function<Throwable, Exchange> fallbackTask = new CircuitBreakerFallbackTask(this.id, this.fallback, exchange);
try {
if (LOG.isTraceEnabled()) {
LOG.trace("Processing exchange: {} using circuit breaker: {}", exchange.getExchangeId(), id);
}
- Try.ofCallable(task).recover(fallbackTask).get();
+ Try.ofCallable(callable).recover(fallbackTask).get();
} catch (Exception e) {
exchange.setException(e);
+ } finally {
+ taskFactory.release(task);
}
+
if (LOG.isTraceEnabled()) {
boolean failed = exchange.isFailed();
LOG.trace("Processing exchange: {} using circuit breaker: {} complete (failed: {})", exchange.getExchangeId(), id,
failed);
}
+
callback.done(true);
return true;
}
@@ -505,17 +534,38 @@ public class ResilienceProcessor extends AsyncProcessorSupport
return exchange;
}
- private static final class CircuitBreakerTask implements Callable<Exchange> {
+ private final class CircuitBreakerTask implements PooledExchangeTask, Callable<Exchange>, Supplier<Exchange> {
+
+ private Exchange exchange;
- Supplier<Exchange> supplier;
+ @Override
+ public void prepare(Exchange exchange, AsyncCallback callback) {
+ this.exchange = exchange;
+ // callback not in use
+ }
+
+ @Override
+ public void reset() {
+ this.exchange = null;
+ }
- public CircuitBreakerTask(Supplier<Exchange> supplier) {
- this.supplier = supplier;
+ @Override
+ public void run() {
+ // not in use
}
@Override
public Exchange call() throws Exception {
- return supplier.get();
+ // this task is either use as callable or supplier
+ // therefore we must call process task before returning the response
+ return processTask(exchange);
+ }
+
+ @Override
+ public Exchange get() {
+ // this task is either use as callable or supplier
+ // therefore we must call process task before returning the response
+ return processTask(exchange);
}
}