You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/04/08 12:09:14 UTC

[camel] 01/02: CAMEL-16455: Optimize CircuitBreaker EIP with task pooling

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 71f6737d342c901471154900754a3ed7853ca038
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Apr 8 11:42:32 2021 +0200

    CAMEL-16455: Optimize CircuitBreaker EIP with task pooling
---
 .../resilience4j/ResilienceProcessor.java          | 86 +++++++++++++++++-----
 1 file changed, 68 insertions(+), 18 deletions(-)

diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
index 57f6f49..864bd0f 100644
--- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
+++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
@@ -47,6 +47,10 @@ import org.apache.camel.RuntimeExchangeException;
 import org.apache.camel.api.management.ManagedAttribute;
 import org.apache.camel.api.management.ManagedOperation;
 import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.processor.PooledExchangeTask;
+import org.apache.camel.processor.PooledExchangeTaskFactory;
+import org.apache.camel.processor.PooledTaskFactory;
+import org.apache.camel.processor.PrototypeTaskFactory;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.spi.ProcessorExchangeFactory;
 import org.apache.camel.spi.RouteIdAware;
@@ -82,6 +86,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport
     private boolean shutdownExecutorService;
     private ExecutorService executorService;
     private ProcessorExchangeFactory processorExchangeFactory;
+    private PooledExchangeTaskFactory taskFactory;
 
     public ResilienceProcessor(CircuitBreakerConfig circuitBreakerConfig, BulkheadConfig bulkheadConfig,
                                TimeLimiterConfig timeLimiterConfig, Processor processor,
@@ -104,13 +109,33 @@ public class ResilienceProcessor extends AsyncProcessorSupport
             bulkhead = Bulkhead.of(id, bulkheadConfig);
         }
 
+        boolean pooled = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled();
+        if (pooled) {
+            taskFactory = new PooledTaskFactory(getId()) {
+                @Override
+                public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) {
+                    return new CircuitBreakerTask();
+                }
+            };
+            int capacity = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().getCapacity();
+            taskFactory.setCapacity(capacity);
+        } else {
+            taskFactory = new PrototypeTaskFactory() {
+                @Override
+                public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) {
+                    return new CircuitBreakerTask();
+                }
+            };
+        }
+        LOG.trace("Using TaskFactory: {}", taskFactory);
+
         // create a per processor exchange factory
         this.processorExchangeFactory = getCamelContext().adapt(ExtendedCamelContext.class)
                 .getProcessorExchangeFactory().newProcessorExchangeFactory(this);
         this.processorExchangeFactory.setRouteId(getRouteId());
         this.processorExchangeFactory.setId(getId());
 
-        ServiceHelper.buildService(processorExchangeFactory, processor);
+        ServiceHelper.buildService(processorExchangeFactory, taskFactory, processor);
     }
 
     @Override
@@ -119,7 +144,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport
             circuitBreaker = CircuitBreaker.of(id, circuitBreakerConfig);
         }
 
-        ServiceHelper.startService(processorExchangeFactory, processor);
+        ServiceHelper.startService(processorExchangeFactory, taskFactory, processor);
     }
 
     @Override
@@ -128,12 +153,12 @@ public class ResilienceProcessor extends AsyncProcessorSupport
             getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
         }
 
-        ServiceHelper.stopService(processorExchangeFactory, processor);
+        ServiceHelper.stopService(processorExchangeFactory, taskFactory, processor);
     }
 
     @Override
     protected void doShutdown() throws Exception {
-        ServiceHelper.stopAndShutdownServices(processorExchangeFactory, processor);
+        ServiceHelper.stopAndShutdownServices(processorExchangeFactory, taskFactory, processor);
     }
 
     @Override
@@ -419,39 +444,43 @@ public class ResilienceProcessor extends AsyncProcessorSupport
         // Camel error handler
         exchange.setProperty(ExchangePropertyKey.TRY_ROUTE_BLOCK, true);
 
-        Callable<Exchange> task;
+        CircuitBreakerTask task = (CircuitBreakerTask) taskFactory.acquire(exchange, callback);
+        Callable<Exchange> callable;
 
         if (timeLimiter != null) {
             Supplier<CompletableFuture<Exchange>> futureSupplier;
             if (executorService == null) {
-                futureSupplier = () -> CompletableFuture.supplyAsync(() -> processTask(exchange));
+                futureSupplier = () -> CompletableFuture.supplyAsync(task);
             } else {
-                futureSupplier = () -> CompletableFuture.supplyAsync(() -> processTask(exchange), executorService);
+                futureSupplier = () -> CompletableFuture.supplyAsync(task, executorService);
             }
-            task = TimeLimiter.decorateFutureSupplier(timeLimiter, futureSupplier);
+            callable = TimeLimiter.decorateFutureSupplier(timeLimiter, futureSupplier);
         } else {
-            task = new CircuitBreakerTask(() -> processTask(exchange));
+            callable = task;
         }
-
         if (bulkhead != null) {
-            task = Bulkhead.decorateCallable(bulkhead, task);
+            callable = Bulkhead.decorateCallable(bulkhead, task);
         }
 
-        task = CircuitBreaker.decorateCallable(circuitBreaker, task);
+        callable = CircuitBreaker.decorateCallable(circuitBreaker, callable);
         Function<Throwable, Exchange> fallbackTask = new CircuitBreakerFallbackTask(this.id, this.fallback, exchange);
         try {
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Processing exchange: {} using circuit breaker: {}", exchange.getExchangeId(), id);
             }
-            Try.ofCallable(task).recover(fallbackTask).get();
+            Try.ofCallable(callable).recover(fallbackTask).get();
         } catch (Exception e) {
             exchange.setException(e);
+        } finally {
+            taskFactory.release(task);
         }
+
         if (LOG.isTraceEnabled()) {
             boolean failed = exchange.isFailed();
             LOG.trace("Processing exchange: {} using circuit breaker: {} complete (failed: {})", exchange.getExchangeId(), id,
                     failed);
         }
+
         callback.done(true);
         return true;
     }
@@ -505,17 +534,38 @@ public class ResilienceProcessor extends AsyncProcessorSupport
         return exchange;
     }
 
-    private static final class CircuitBreakerTask implements Callable<Exchange> {
+    private final class CircuitBreakerTask implements PooledExchangeTask, Callable<Exchange>, Supplier<Exchange> {
+
+        private Exchange exchange;
 
-        Supplier<Exchange> supplier;
+        @Override
+        public void prepare(Exchange exchange, AsyncCallback callback) {
+            this.exchange = exchange;
+            // callback not in use
+        }
+
+        @Override
+        public void reset() {
+            this.exchange = null;
+        }
 
-        public CircuitBreakerTask(Supplier<Exchange> supplier) {
-            this.supplier = supplier;
+        @Override
+        public void run() {
+            // not in use
         }
 
         @Override
         public Exchange call() throws Exception {
-            return supplier.get();
+            // this task is either use as callable or supplier
+            // therefore we must call process task before returning the response
+            return processTask(exchange);
+        }
+
+        @Override
+        public Exchange get() {
+            // this task is either use as callable or supplier
+            // therefore we must call process task before returning the response
+            return processTask(exchange);
         }
     }