You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/11/18 09:00:47 UTC
[camel] 05/23: CAMEL-13691: camel-resilience4j - WIP
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch CAMEL-13691
in repository https://gitbox.apache.org/repos/asf/camel.git
commit f3e97b122212c2427e91ffdbf36a0950cbb64514
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sat Nov 16 13:23:47 2019 +0100
CAMEL-13691: camel-resilience4j - WIP
---
.../resilience4j/ResilienceProcessor.java | 11 +--
.../component/resilience4j/ResilienceReifier.java | 104 ++++++++++++++++++++-
2 files changed, 106 insertions(+), 9 deletions(-)
diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
index 1a6dbef..91fa38c 100644
--- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
+++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
@@ -16,16 +16,13 @@
*/
package org.apache.camel.component.resilience4j;
-import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.Supplier;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
-import io.github.resilience4j.timelimiter.TimeLimiter;
+import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
import io.vavr.control.Try;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
@@ -48,11 +45,13 @@ public class ResilienceProcessor extends AsyncProcessorSupport implements Naviga
private static final Logger LOG = LoggerFactory.getLogger(ResilienceProcessor.class);
private String id;
+ private CircuitBreakerConfig config;
private final Processor processor;
private final Processor fallback;
private final boolean fallbackViaNetwork;
- public ResilienceProcessor(Processor processor, Processor fallback, boolean fallbackViaNetwork) {
+ public ResilienceProcessor(CircuitBreakerConfig config, Processor processor, Processor fallback, boolean fallbackViaNetwork) {
+ this.config = config;
this.processor = processor;
this.fallback = fallback;
this.fallbackViaNetwork = fallbackViaNetwork;
@@ -109,7 +108,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport implements Naviga
// Future
// });
- CircuitBreaker cb = CircuitBreaker.ofDefaults(id);
+ CircuitBreaker cb = CircuitBreaker.of(id, config);
Supplier<Exchange> task = CircuitBreaker.decorateSupplier(cb, new CircuitBreakerTask(processor, exchange));
Try.ofSupplier(task)
.recover(new CircuitBreakerFallbackTask(fallback, exchange))
diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java
index 4bd1278..3704219 100644
--- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java
+++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java
@@ -16,14 +16,31 @@
*/
package org.apache.camel.component.resilience4j;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.Processor;
import org.apache.camel.model.CircuitBreakerDefinition;
+import org.apache.camel.model.Model;
+import org.apache.camel.model.Resilience4jConfigurationCommon;
+import org.apache.camel.model.Resilience4jConfigurationDefinition;
import org.apache.camel.reifier.ProcessorReifier;
+import org.apache.camel.spi.BeanIntrospection;
import org.apache.camel.spi.RouteContext;
+import org.apache.camel.support.PropertyBindingSupport;
+import org.apache.camel.util.function.Suppliers;
+
+import static org.apache.camel.support.CamelContextHelper.lookup;
+import static org.apache.camel.support.CamelContextHelper.mandatoryLookup;
public class ResilienceReifier extends ProcessorReifier<CircuitBreakerDefinition> {
- // TODO: Resilience configuration in camel-core / model
// TODO: Timeout
// TODO: Bulkhead for viaNetwork
@@ -40,9 +57,90 @@ public class ResilienceReifier extends ProcessorReifier<CircuitBreakerDefinition
fallback = ProcessorReifier.reifier(definition.getOnFallback()).createProcessor(routeContext);
}
- final String id = getId(definition, routeContext);
+ final Resilience4jConfigurationCommon config = buildResilience4jConfiguration(routeContext.getCamelContext());
+ CircuitBreakerConfig cfg = configureResilience4j(config);
+
+ return new ResilienceProcessor(cfg, processor, fallback, false);
+ }
+
+ private CircuitBreakerConfig configureResilience4j(Resilience4jConfigurationCommon config) {
+ CircuitBreakerConfig.Builder builder = CircuitBreakerConfig.custom();
+ if (config.getAutomaticTransitionFromOpenToHalfOpenEnabled() != null) {
+ builder.automaticTransitionFromOpenToHalfOpenEnabled(config.getAutomaticTransitionFromOpenToHalfOpenEnabled());
+ }
+ if (config.getFailureRateThreshold() != null) {
+ builder.failureRateThreshold(config.getFailureRateThreshold());
+ }
+ if (config.getMinimumNumberOfCalls() != null) {
+ builder.minimumNumberOfCalls(config.getMinimumNumberOfCalls());
+ }
+ if (config.getPermittedNumberOfCallsInHalfOpenState() != null) {
+ builder.permittedNumberOfCallsInHalfOpenState(config.getPermittedNumberOfCallsInHalfOpenState());
+ }
+ if (config.getSlidingWindowSize() != null) {
+ builder.slidingWindowSize(config.getSlidingWindowSize());
+ }
+ if (config.getSlidingWindowType() != null) {
+ builder.slidingWindowType(CircuitBreakerConfig.SlidingWindowType.valueOf(config.getSlidingWindowType()));
+ }
+ if (config.getSlowCallDurationThreshold() != null) {
+ builder.slowCallDurationThreshold(Duration.ofSeconds(config.getSlowCallDurationThreshold()));
+ }
+ if (config.getSlowCallRateThreshold() != null) {
+ builder.slowCallRateThreshold(config.getSlowCallRateThreshold());
+ }
+ if (config.getWaitDurationInOpenState() != null) {
+ builder.waitDurationInOpenState(Duration.ofSeconds(config.getWaitDurationInOpenState()));
+ }
+ if (config.getWritableStackTraceEnabled() != null) {
+ builder.writableStackTraceEnabled(config.getWritableStackTraceEnabled());
+ }
+ return builder.build();
+ }
+
+ // *******************************
+ // Helpers
+ // *******************************
+
+ Resilience4jConfigurationDefinition buildResilience4jConfiguration(CamelContext camelContext) throws Exception {
+ Map<String, Object> properties = new HashMap<>();
+
+ // Extract properties from default configuration, the one configured on
+ // camel context takes the precedence over those in the registry
+ loadProperties(camelContext, properties, Suppliers.firstNotNull(
+ () -> camelContext.getExtension(Model.class).getResilience4jConfiguration(null),
+ () -> lookup(camelContext, "Camel", Resilience4jConfigurationDefinition.class))
+ );
+
+ // Extract properties from referenced configuration, the one configured
+ // on camel context takes the precedence over those in the registry
+ if (definition.getConfigurationRef() != null) {
+ final String ref = definition.getConfigurationRef();
+
+ loadProperties(camelContext, properties, Suppliers.firstNotNull(
+ () -> camelContext.getExtension(Model.class).getResilience4jConfiguration(ref),
+ () -> mandatoryLookup(camelContext, ref, Resilience4jConfigurationDefinition.class))
+ );
+ }
+
+ // Extract properties from local configuration
+ loadProperties(camelContext, properties, Optional.ofNullable(definition.getResilience4jConfiguration()));
+
+ // Extract properties from definition
+ BeanIntrospection beanIntrospection = camelContext.adapt(ExtendedCamelContext.class).getBeanIntrospection();
+ beanIntrospection.getProperties(definition, properties, null, false);
+
+ Resilience4jConfigurationDefinition config = new Resilience4jConfigurationDefinition();
+
+ // Apply properties to a new configuration
+ PropertyBindingSupport.bindProperties(camelContext, config, properties);
+
+ return config;
+ }
- return new ResilienceProcessor(processor, fallback, false);
+ private void loadProperties(CamelContext camelContext, Map<String, Object> properties, Optional<?> optional) {
+ BeanIntrospection beanIntrospection = camelContext.adapt(ExtendedCamelContext.class).getBeanIntrospection();
+ optional.ifPresent(bean -> beanIntrospection.getProperties(bean, properties, null, false));
}
}