You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/03/18 19:34:00 UTC

[camel] branch camel-3.7.x updated: CAMEL-16173: Camel Resilience4j Bulkhead seems to not limit concurrent requests. Thanks to Jesper Duelund Isaksen for the excellent reproducer example.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.7.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.7.x by this push:
     new 172d0fb  CAMEL-16173: Camel Resilience4j Bulkhead seems to not limit concurrent requests. Thanks to Jesper Duelund Isaksen for the excellent reproducer example.
172d0fb is described below

commit 172d0fb0767a4892d1b8770bfb6446bc297908ac
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Mar 18 20:32:29 2021 +0100

    CAMEL-16173: Camel Resilience4j Bulkhead seems to not limit concurrent requests. Thanks to Jesper Duelund Isaksen for the excellent reproducer example.
---
 .../resilience4j/ResilienceProcessor.java          | 78 +++++++++++++++++-----
 .../component/resilience4j/ResilienceReifier.java  |  7 +-
 .../model/Resilience4jConfigurationDefinition.java |  2 +-
 3 files changed, 68 insertions(+), 19 deletions(-)

diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
index 2d8f366..c9bd5ca 100644
--- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
+++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
@@ -27,6 +27,7 @@ import java.util.function.Supplier;
 
 import io.github.resilience4j.bulkhead.Bulkhead;
 import io.github.resilience4j.bulkhead.BulkheadConfig;
+import io.github.resilience4j.bulkhead.BulkheadFullException;
 import io.github.resilience4j.circuitbreaker.CallNotPermittedException;
 import io.github.resilience4j.circuitbreaker.CircuitBreaker;
 import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
@@ -66,7 +67,9 @@ public class ResilienceProcessor extends AsyncProcessorSupport
     private String id;
     private final CircuitBreakerConfig circuitBreakerConfig;
     private final BulkheadConfig bulkheadConfig;
+    private Bulkhead bulkhead;
     private final TimeLimiterConfig timeLimiterConfig;
+    private TimeLimiter timeLimiter;
     private final Processor processor;
     private final Processor fallback;
     private boolean shutdownExecutorService;
@@ -83,6 +86,17 @@ public class ResilienceProcessor extends AsyncProcessorSupport
     }
 
     @Override
+    protected void doBuild() throws Exception {
+        super.doBuild();
+        if (timeLimiterConfig != null) {
+            timeLimiter = TimeLimiter.of(id, timeLimiterConfig);
+        }
+        if (bulkheadConfig != null) {
+            bulkhead = Bulkhead.of(id, bulkheadConfig);
+        }
+    }
+
+    @Override
     public CamelContext getCamelContext() {
         return camelContext;
     }
@@ -357,31 +371,39 @@ public class ResilienceProcessor extends AsyncProcessorSupport
 
         Callable<Exchange> task;
 
-        if (timeLimiterConfig != null) {
-            // timeout handling is more complex with thread-pools
-
-            TimeLimiter tl = TimeLimiter.of(id, timeLimiterConfig);
+        if (timeLimiter != null) {
             Supplier<CompletableFuture<Exchange>> futureSupplier;
             if (executorService == null) {
                 futureSupplier = () -> CompletableFuture.supplyAsync(() -> processInCopy(exchange));
             } else {
                 futureSupplier = () -> CompletableFuture.supplyAsync(() -> processInCopy(exchange), executorService);
             }
-            task = TimeLimiter.decorateFutureSupplier(tl, futureSupplier);
+            task = TimeLimiter.decorateFutureSupplier(timeLimiter, futureSupplier);
         } else {
             task = new CircuitBreakerTask(() -> processInCopy(exchange));
         }
 
-        if (bulkheadConfig != null) {
-            Bulkhead bh = Bulkhead.of(id, bulkheadConfig);
-            task = Bulkhead.decorateCallable(bh, task);
+        if (bulkhead != null) {
+            task = Bulkhead.decorateCallable(bulkhead, task);
         }
 
         task = CircuitBreaker.decorateCallable(circuitBreaker, task);
-
-        Function<Throwable, Exchange> fallbackTask = new CircuitBreakerFallbackTask(this.fallback, exchange);
-        Try.ofCallable(task).recover(fallbackTask).andFinally(() -> callback.done(false)).get();
-        return false;
+        Function<Throwable, Exchange> fallbackTask = new CircuitBreakerFallbackTask(this.id, this.fallback, exchange);
+        try {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Processing exchange: {} using circuit breaker: {}", exchange.getExchangeId(), id);
+            }
+            Try.ofCallable(task).recover(fallbackTask).get();
+        } catch (Throwable e) {
+            exchange.setException(e);
+        }
+        if (LOG.isTraceEnabled()) {
+            boolean failed = exchange.isFailed();
+            LOG.trace("Processing exchange: {} using circuit breaker: {} complete (failed: {})", exchange.getExchangeId(), id,
+                    failed);
+        }
+        callback.done(true);
+        return true;
     }
 
     private Exchange processInCopy(Exchange exchange) {
@@ -442,16 +464,23 @@ public class ResilienceProcessor extends AsyncProcessorSupport
 
     private static final class CircuitBreakerFallbackTask implements Function<Throwable, Exchange> {
 
+        private final String id;
         private final Processor processor;
         private final Exchange exchange;
 
-        private CircuitBreakerFallbackTask(Processor processor, Exchange exchange) {
+        private CircuitBreakerFallbackTask(String id, Processor processor, Exchange exchange) {
+            this.id = id;
             this.processor = processor;
             this.exchange = exchange;
         }
 
         @Override
         public Exchange apply(Throwable throwable) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Processing exchange: {} recover task using circuit breaker: {} from: {}", exchange.getExchangeId(),
+                        id, throwable);
+            }
+
             if (processor == null) {
                 if (throwable instanceof TimeoutException) {
                     // the circuit breaker triggered a timeout (and there is no
@@ -464,14 +493,29 @@ public class ResilienceProcessor extends AsyncProcessorSupport
                     return exchange;
                 } else if (throwable instanceof CallNotPermittedException) {
                     // the circuit breaker triggered a call rejected
+                    // where the circuit breaker is half-open / open and therefore
+                    // we should just set properties and do not set any exception
+                    exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, false);
+                    exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, false);
+                    exchange.setProperty(CircuitBreakerConstants.RESPONSE_SHORT_CIRCUITED, true);
+                    exchange.setProperty(CircuitBreakerConstants.RESPONSE_REJECTED, true);
+                    return exchange;
+                } else if (throwable instanceof BulkheadFullException) {
+                    // the circuit breaker bulkhead is full
                     exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, false);
                     exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, false);
                     exchange.setProperty(CircuitBreakerConstants.RESPONSE_SHORT_CIRCUITED, true);
                     exchange.setProperty(CircuitBreakerConstants.RESPONSE_REJECTED, true);
-                    throw RuntimeExchangeException.wrapRuntimeException(throwable);
+                    exchange.setException(throwable);
+                    return exchange;
                 } else {
-                    // throw exception so resilient4j know it was a failure
-                    throw RuntimeExchangeException.wrapRuntimeException(throwable);
+                    // other kind of exception
+                    exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, false);
+                    exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, false);
+                    exchange.setProperty(CircuitBreakerConstants.RESPONSE_SHORT_CIRCUITED, true);
+                    exchange.setProperty(CircuitBreakerConstants.RESPONSE_REJECTED, true);
+                    exchange.setException(throwable);
+                    return exchange;
                 }
             }
 
@@ -498,7 +542,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport
                 // process the fallback until its fully done
                 processor.process(exchange);
                 LOG.debug("Running fallback: {} with exchange: {} done", processor, exchange);
-            } catch (Exception e) {
+            } catch (Throwable e) {
                 exchange.setException(e);
             }
 
diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java
index 76c9597..c65eb29 100644
--- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java
+++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java
@@ -120,7 +120,12 @@ public class ResilienceReifier extends ProcessorReifier<CircuitBreakerDefinition
             builder.maxConcurrentCalls(parseInt(config.getBulkheadMaxConcurrentCalls()));
         }
         if (config.getBulkheadMaxWaitDuration() != null) {
-            builder.maxWaitDuration(Duration.ofMillis(parseLong(config.getBulkheadMaxWaitDuration())));
+            long duration = parseLong(config.getBulkheadMaxWaitDuration());
+            if (duration <= 0) {
+                builder.maxWaitDuration(Duration.ZERO);
+            } else {
+                builder.maxWaitDuration(Duration.ofMillis(duration));
+            }
         }
         return builder.build();
     }
diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/Resilience4jConfigurationDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/Resilience4jConfigurationDefinition.java
index 9fb6891..77468c2 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/model/Resilience4jConfigurationDefinition.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/model/Resilience4jConfigurationDefinition.java
@@ -198,7 +198,7 @@ public class Resilience4jConfigurationDefinition extends Resilience4jConfigurati
      * Configures the max amount of concurrent calls the bulkhead will support.
      */
     public Resilience4jConfigurationDefinition bulkheadMaxConcurrentCalls(int bulkheadMaxConcurrentCalls) {
-        setBulkheadMaxWaitDuration(Integer.toString(bulkheadMaxConcurrentCalls));
+        setBulkheadMaxConcurrentCalls(Integer.toString(bulkheadMaxConcurrentCalls));
         return this;
     }