You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/03/18 19:34:00 UTC
[camel] branch camel-3.7.x updated: CAMEL-16173: Camel Resilience4j
Bulkhead seems to not limit concurrent requests. Thanks to Jesper Duelund
Isaksen for the excellent reproducer example.
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-3.7.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.7.x by this push:
new 172d0fb CAMEL-16173: Camel Resilience4j Bulkhead seems to not limit concurrent requests. Thanks to Jesper Duelund Isaksen for the excellent reproducer example.
172d0fb is described below
commit 172d0fb0767a4892d1b8770bfb6446bc297908ac
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Mar 18 20:32:29 2021 +0100
CAMEL-16173: Camel Resilience4j Bulkhead seems to not limit concurrent requests. Thanks to Jesper Duelund Isaksen for the excellent reproducer example.
---
.../resilience4j/ResilienceProcessor.java | 78 +++++++++++++++++-----
.../component/resilience4j/ResilienceReifier.java | 7 +-
.../model/Resilience4jConfigurationDefinition.java | 2 +-
3 files changed, 68 insertions(+), 19 deletions(-)
diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
index 2d8f366..c9bd5ca 100644
--- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
+++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
@@ -27,6 +27,7 @@ import java.util.function.Supplier;
import io.github.resilience4j.bulkhead.Bulkhead;
import io.github.resilience4j.bulkhead.BulkheadConfig;
+import io.github.resilience4j.bulkhead.BulkheadFullException;
import io.github.resilience4j.circuitbreaker.CallNotPermittedException;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
@@ -66,7 +67,9 @@ public class ResilienceProcessor extends AsyncProcessorSupport
private String id;
private final CircuitBreakerConfig circuitBreakerConfig;
private final BulkheadConfig bulkheadConfig;
+ private Bulkhead bulkhead;
private final TimeLimiterConfig timeLimiterConfig;
+ private TimeLimiter timeLimiter;
private final Processor processor;
private final Processor fallback;
private boolean shutdownExecutorService;
@@ -83,6 +86,17 @@ public class ResilienceProcessor extends AsyncProcessorSupport
}
@Override
+ protected void doBuild() throws Exception {
+ super.doBuild();
+ if (timeLimiterConfig != null) {
+ timeLimiter = TimeLimiter.of(id, timeLimiterConfig);
+ }
+ if (bulkheadConfig != null) {
+ bulkhead = Bulkhead.of(id, bulkheadConfig);
+ }
+ }
+
+ @Override
public CamelContext getCamelContext() {
return camelContext;
}
@@ -357,31 +371,39 @@ public class ResilienceProcessor extends AsyncProcessorSupport
Callable<Exchange> task;
- if (timeLimiterConfig != null) {
- // timeout handling is more complex with thread-pools
-
- TimeLimiter tl = TimeLimiter.of(id, timeLimiterConfig);
+ if (timeLimiter != null) {
Supplier<CompletableFuture<Exchange>> futureSupplier;
if (executorService == null) {
futureSupplier = () -> CompletableFuture.supplyAsync(() -> processInCopy(exchange));
} else {
futureSupplier = () -> CompletableFuture.supplyAsync(() -> processInCopy(exchange), executorService);
}
- task = TimeLimiter.decorateFutureSupplier(tl, futureSupplier);
+ task = TimeLimiter.decorateFutureSupplier(timeLimiter, futureSupplier);
} else {
task = new CircuitBreakerTask(() -> processInCopy(exchange));
}
- if (bulkheadConfig != null) {
- Bulkhead bh = Bulkhead.of(id, bulkheadConfig);
- task = Bulkhead.decorateCallable(bh, task);
+ if (bulkhead != null) {
+ task = Bulkhead.decorateCallable(bulkhead, task);
}
task = CircuitBreaker.decorateCallable(circuitBreaker, task);
-
- Function<Throwable, Exchange> fallbackTask = new CircuitBreakerFallbackTask(this.fallback, exchange);
- Try.ofCallable(task).recover(fallbackTask).andFinally(() -> callback.done(false)).get();
- return false;
+ Function<Throwable, Exchange> fallbackTask = new CircuitBreakerFallbackTask(this.id, this.fallback, exchange);
+ try {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processing exchange: {} using circuit breaker: {}", exchange.getExchangeId(), id);
+ }
+ Try.ofCallable(task).recover(fallbackTask).get();
+ } catch (Throwable e) {
+ exchange.setException(e);
+ }
+ if (LOG.isTraceEnabled()) {
+ boolean failed = exchange.isFailed();
+ LOG.trace("Processing exchange: {} using circuit breaker: {} complete (failed: {})", exchange.getExchangeId(), id,
+ failed);
+ }
+ callback.done(true);
+ return true;
}
private Exchange processInCopy(Exchange exchange) {
@@ -442,16 +464,23 @@ public class ResilienceProcessor extends AsyncProcessorSupport
private static final class CircuitBreakerFallbackTask implements Function<Throwable, Exchange> {
+ private final String id;
private final Processor processor;
private final Exchange exchange;
- private CircuitBreakerFallbackTask(Processor processor, Exchange exchange) {
+ private CircuitBreakerFallbackTask(String id, Processor processor, Exchange exchange) {
+ this.id = id;
this.processor = processor;
this.exchange = exchange;
}
@Override
public Exchange apply(Throwable throwable) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processing exchange: {} recover task using circuit breaker: {} from: {}", exchange.getExchangeId(),
+ id, throwable);
+ }
+
if (processor == null) {
if (throwable instanceof TimeoutException) {
// the circuit breaker triggered a timeout (and there is no
@@ -464,14 +493,29 @@ public class ResilienceProcessor extends AsyncProcessorSupport
return exchange;
} else if (throwable instanceof CallNotPermittedException) {
// the circuit breaker triggered a call rejected
+ // where the circuit breaker is half-open / open and therefore
+ // we should just set properties and do not set any exception
+ exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, false);
+ exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, false);
+ exchange.setProperty(CircuitBreakerConstants.RESPONSE_SHORT_CIRCUITED, true);
+ exchange.setProperty(CircuitBreakerConstants.RESPONSE_REJECTED, true);
+ return exchange;
+ } else if (throwable instanceof BulkheadFullException) {
+ // the circuit breaker bulkhead is full
exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, false);
exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, false);
exchange.setProperty(CircuitBreakerConstants.RESPONSE_SHORT_CIRCUITED, true);
exchange.setProperty(CircuitBreakerConstants.RESPONSE_REJECTED, true);
- throw RuntimeExchangeException.wrapRuntimeException(throwable);
+ exchange.setException(throwable);
+ return exchange;
} else {
- // throw exception so resilient4j know it was a failure
- throw RuntimeExchangeException.wrapRuntimeException(throwable);
+ // other kind of exception
+ exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, false);
+ exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, false);
+ exchange.setProperty(CircuitBreakerConstants.RESPONSE_SHORT_CIRCUITED, true);
+ exchange.setProperty(CircuitBreakerConstants.RESPONSE_REJECTED, true);
+ exchange.setException(throwable);
+ return exchange;
}
}
@@ -498,7 +542,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport
// process the fallback until its fully done
processor.process(exchange);
LOG.debug("Running fallback: {} with exchange: {} done", processor, exchange);
- } catch (Exception e) {
+ } catch (Throwable e) {
exchange.setException(e);
}
diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java
index 76c9597..c65eb29 100644
--- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java
+++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java
@@ -120,7 +120,12 @@ public class ResilienceReifier extends ProcessorReifier<CircuitBreakerDefinition
builder.maxConcurrentCalls(parseInt(config.getBulkheadMaxConcurrentCalls()));
}
if (config.getBulkheadMaxWaitDuration() != null) {
- builder.maxWaitDuration(Duration.ofMillis(parseLong(config.getBulkheadMaxWaitDuration())));
+ long duration = parseLong(config.getBulkheadMaxWaitDuration());
+ if (duration <= 0) {
+ builder.maxWaitDuration(Duration.ZERO);
+ } else {
+ builder.maxWaitDuration(Duration.ofMillis(duration));
+ }
}
return builder.build();
}
diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/Resilience4jConfigurationDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/Resilience4jConfigurationDefinition.java
index 9fb6891..77468c2 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/model/Resilience4jConfigurationDefinition.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/model/Resilience4jConfigurationDefinition.java
@@ -198,7 +198,7 @@ public class Resilience4jConfigurationDefinition extends Resilience4jConfigurati
* Configures the max amount of concurrent calls the bulkhead will support.
*/
public Resilience4jConfigurationDefinition bulkheadMaxConcurrentCalls(int bulkheadMaxConcurrentCalls) {
- setBulkheadMaxWaitDuration(Integer.toString(bulkheadMaxConcurrentCalls));
+ setBulkheadMaxConcurrentCalls(Integer.toString(bulkheadMaxConcurrentCalls));
return this;
}