You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/04/08 12:09:15 UTC

[camel] 02/02: CAMEL-16455: Optimize CircuitBreaker EIP with task pooling

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit e8bbb704187ba031cf82a2e329a68b6a3bc7e64b
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Apr 8 12:01:35 2021 +0200

    CAMEL-16455: Optimize CircuitBreaker EIP with task pooling
---
 .../resilience4j/ResilienceProcessor.java          | 58 +++++++++++++++-------
 1 file changed, 40 insertions(+), 18 deletions(-)

diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
index 864bd0f..05377ba 100644
--- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
+++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
@@ -87,6 +87,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport
     private ExecutorService executorService;
     private ProcessorExchangeFactory processorExchangeFactory;
     private PooledExchangeTaskFactory taskFactory;
+    private PooledExchangeTaskFactory fallbackTaskFactory;
 
     public ResilienceProcessor(CircuitBreakerConfig circuitBreakerConfig, BulkheadConfig bulkheadConfig,
                                TimeLimiterConfig timeLimiterConfig, Processor processor,
@@ -111,14 +112,21 @@ public class ResilienceProcessor extends AsyncProcessorSupport
 
         boolean pooled = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled();
         if (pooled) {
+            int capacity = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().getCapacity();
             taskFactory = new PooledTaskFactory(getId()) {
                 @Override
                 public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) {
                     return new CircuitBreakerTask();
                 }
             };
-            int capacity = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().getCapacity();
             taskFactory.setCapacity(capacity);
+            fallbackTaskFactory = new PooledTaskFactory(getId()) {
+                @Override
+                public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) {
+                    return new CircuitBreakerFallbackTask();
+                }
+            };
+            fallbackTaskFactory.setCapacity(capacity);
         } else {
             taskFactory = new PrototypeTaskFactory() {
                 @Override
@@ -126,8 +134,13 @@ public class ResilienceProcessor extends AsyncProcessorSupport
                     return new CircuitBreakerTask();
                 }
             };
+            fallbackTaskFactory = new PrototypeTaskFactory() {
+                @Override
+                public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) {
+                    return new CircuitBreakerFallbackTask();
+                }
+            };
         }
-        LOG.trace("Using TaskFactory: {}", taskFactory);
 
         // create a per processor exchange factory
         this.processorExchangeFactory = getCamelContext().adapt(ExtendedCamelContext.class)
@@ -135,7 +148,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport
         this.processorExchangeFactory.setRouteId(getRouteId());
         this.processorExchangeFactory.setId(getId());
 
-        ServiceHelper.buildService(processorExchangeFactory, taskFactory, processor);
+        ServiceHelper.buildService(processorExchangeFactory, taskFactory, fallbackTaskFactory, processor);
     }
 
     @Override
@@ -144,7 +157,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport
             circuitBreaker = CircuitBreaker.of(id, circuitBreakerConfig);
         }
 
-        ServiceHelper.startService(processorExchangeFactory, taskFactory, processor);
+        ServiceHelper.startService(processorExchangeFactory, taskFactory, fallbackTaskFactory, processor);
     }
 
     @Override
@@ -153,12 +166,12 @@ public class ResilienceProcessor extends AsyncProcessorSupport
             getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
         }
 
-        ServiceHelper.stopService(processorExchangeFactory, taskFactory, processor);
+        ServiceHelper.stopService(processorExchangeFactory, taskFactory, fallbackTaskFactory, processor);
     }
 
     @Override
     protected void doShutdown() throws Exception {
-        ServiceHelper.stopAndShutdownServices(processorExchangeFactory, taskFactory, processor);
+        ServiceHelper.stopAndShutdownServices(processorExchangeFactory, taskFactory, fallbackTaskFactory, processor);
     }
 
     @Override
@@ -444,6 +457,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport
         // Camel error handler
         exchange.setProperty(ExchangePropertyKey.TRY_ROUTE_BLOCK, true);
 
+        CircuitBreakerFallbackTask fallbackTask = (CircuitBreakerFallbackTask) fallbackTaskFactory.acquire(exchange, callback);
         CircuitBreakerTask task = (CircuitBreakerTask) taskFactory.acquire(exchange, callback);
         Callable<Exchange> callable;
 
@@ -463,7 +477,6 @@ public class ResilienceProcessor extends AsyncProcessorSupport
         }
 
         callable = CircuitBreaker.decorateCallable(circuitBreaker, callable);
-        Function<Throwable, Exchange> fallbackTask = new CircuitBreakerFallbackTask(this.id, this.fallback, exchange);
         try {
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Processing exchange: {} using circuit breaker: {}", exchange.getExchangeId(), id);
@@ -473,6 +486,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport
             exchange.setException(e);
         } finally {
             taskFactory.release(task);
+            fallbackTaskFactory.release(fallbackTask);
         }
 
         if (LOG.isTraceEnabled()) {
@@ -569,16 +583,24 @@ public class ResilienceProcessor extends AsyncProcessorSupport
         }
     }
 
-    private static final class CircuitBreakerFallbackTask implements Function<Throwable, Exchange> {
+    private final class CircuitBreakerFallbackTask implements PooledExchangeTask, Function<Throwable, Exchange> {
 
-        private final String id;
-        private final Processor processor;
-        private final Exchange exchange;
+        private Exchange exchange;
 
-        private CircuitBreakerFallbackTask(String id, Processor processor, Exchange exchange) {
-            this.id = id;
-            this.processor = processor;
+        @Override
+        public void prepare(Exchange exchange, AsyncCallback callback) {
             this.exchange = exchange;
+            // callback not in use
+        }
+
+        @Override
+        public void reset() {
+            this.exchange = null;
+        }
+
+        @Override
+        public void run() {
+            // not in use
         }
 
         @Override
@@ -588,7 +610,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport
                         id, throwable);
             }
 
-            if (processor == null) {
+            if (fallback == null) {
                 if (throwable instanceof TimeoutException) {
                     // the circuit breaker triggered a timeout (and there is no
                     // fallback) so lets mark the exchange as failed
@@ -646,10 +668,10 @@ public class ResilienceProcessor extends AsyncProcessorSupport
             exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false);
             // run the fallback processor
             try {
-                LOG.debug("Running fallback: {} with exchange: {}", processor, exchange);
+                LOG.debug("Running fallback: {} with exchange: {}", fallback, exchange);
                 // process the fallback until its fully done
-                processor.process(exchange);
-                LOG.debug("Running fallback: {} with exchange: {} done", processor, exchange);
+                fallback.process(exchange);
+                LOG.debug("Running fallback: {} with exchange: {} done", fallback, exchange);
             } catch (Throwable e) {
                 exchange.setException(e);
             }