You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/11/18 09:00:47 UTC

[camel] 05/23: CAMEL-13691: camel-resilience4j - WIP

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch CAMEL-13691
in repository https://gitbox.apache.org/repos/asf/camel.git

commit f3e97b122212c2427e91ffdbf36a0950cbb64514
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sat Nov 16 13:23:47 2019 +0100

    CAMEL-13691: camel-resilience4j - WIP
---
 .../resilience4j/ResilienceProcessor.java          |  11 +--
 .../component/resilience4j/ResilienceReifier.java  | 104 ++++++++++++++++++++-
 2 files changed, 106 insertions(+), 9 deletions(-)

diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
index 1a6dbef..91fa38c 100644
--- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
+++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
@@ -16,16 +16,13 @@
  */
 package org.apache.camel.component.resilience4j;
 
-import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
 import io.github.resilience4j.circuitbreaker.CircuitBreaker;
-import io.github.resilience4j.timelimiter.TimeLimiter;
+import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
 import io.vavr.control.Try;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
@@ -48,11 +45,13 @@ public class ResilienceProcessor extends AsyncProcessorSupport implements Naviga
     private static final Logger LOG = LoggerFactory.getLogger(ResilienceProcessor.class);
 
     private String id;
+    private CircuitBreakerConfig config;
     private final Processor processor;
     private final Processor fallback;
     private final boolean fallbackViaNetwork;
 
-    public ResilienceProcessor(Processor processor, Processor fallback, boolean fallbackViaNetwork) {
+    public ResilienceProcessor(CircuitBreakerConfig config, Processor processor, Processor fallback, boolean fallbackViaNetwork) {
+        this.config = config;
         this.processor = processor;
         this.fallback = fallback;
         this.fallbackViaNetwork = fallbackViaNetwork;
@@ -109,7 +108,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport implements Naviga
 //            Future
 //        });
 
-        CircuitBreaker cb = CircuitBreaker.ofDefaults(id);
+        CircuitBreaker cb = CircuitBreaker.of(id, config);
         Supplier<Exchange> task = CircuitBreaker.decorateSupplier(cb, new CircuitBreakerTask(processor, exchange));
         Try.ofSupplier(task)
                 .recover(new CircuitBreakerFallbackTask(fallback, exchange))
diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java
index 4bd1278..3704219 100644
--- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java
+++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java
@@ -16,14 +16,31 @@
  */
 package org.apache.camel.component.resilience4j;
 
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.Processor;
 import org.apache.camel.model.CircuitBreakerDefinition;
+import org.apache.camel.model.Model;
+import org.apache.camel.model.Resilience4jConfigurationCommon;
+import org.apache.camel.model.Resilience4jConfigurationDefinition;
 import org.apache.camel.reifier.ProcessorReifier;
+import org.apache.camel.spi.BeanIntrospection;
 import org.apache.camel.spi.RouteContext;
+import org.apache.camel.support.PropertyBindingSupport;
+import org.apache.camel.util.function.Suppliers;
+
+import static org.apache.camel.support.CamelContextHelper.lookup;
+import static org.apache.camel.support.CamelContextHelper.mandatoryLookup;
 
 public class ResilienceReifier extends ProcessorReifier<CircuitBreakerDefinition> {
 
-    // TODO: Resilience configuration in camel-core / model
     // TODO: Timeout
     // TODO: Bulkhead for viaNetwork
 
@@ -40,9 +57,90 @@ public class ResilienceReifier extends ProcessorReifier<CircuitBreakerDefinition
             fallback = ProcessorReifier.reifier(definition.getOnFallback()).createProcessor(routeContext);
         }
 
-        final String id = getId(definition, routeContext);
+        final Resilience4jConfigurationCommon config = buildResilience4jConfiguration(routeContext.getCamelContext());
+        CircuitBreakerConfig cfg = configureResilience4j(config);
+
+        return new ResilienceProcessor(cfg, processor, fallback, false);
+    }
+
+    private CircuitBreakerConfig configureResilience4j(Resilience4jConfigurationCommon config) {
+        CircuitBreakerConfig.Builder builder = CircuitBreakerConfig.custom();
+        if (config.getAutomaticTransitionFromOpenToHalfOpenEnabled() != null) {
+            builder.automaticTransitionFromOpenToHalfOpenEnabled(config.getAutomaticTransitionFromOpenToHalfOpenEnabled());
+        }
+        if (config.getFailureRateThreshold() != null) {
+            builder.failureRateThreshold(config.getFailureRateThreshold());
+        }
+        if (config.getMinimumNumberOfCalls() != null) {
+            builder.minimumNumberOfCalls(config.getMinimumNumberOfCalls());
+        }
+        if (config.getPermittedNumberOfCallsInHalfOpenState() != null) {
+            builder.permittedNumberOfCallsInHalfOpenState(config.getPermittedNumberOfCallsInHalfOpenState());
+        }
+        if (config.getSlidingWindowSize() != null) {
+            builder.slidingWindowSize(config.getSlidingWindowSize());
+        }
+        if (config.getSlidingWindowType() != null) {
+            builder.slidingWindowType(CircuitBreakerConfig.SlidingWindowType.valueOf(config.getSlidingWindowType()));
+        }
+        if (config.getSlowCallDurationThreshold() != null) {
+            builder.slowCallDurationThreshold(Duration.ofSeconds(config.getSlowCallDurationThreshold()));
+        }
+        if (config.getSlowCallRateThreshold() != null) {
+            builder.slowCallRateThreshold(config.getSlowCallRateThreshold());
+        }
+        if (config.getWaitDurationInOpenState() != null) {
+            builder.waitDurationInOpenState(Duration.ofSeconds(config.getWaitDurationInOpenState()));
+        }
+        if (config.getWritableStackTraceEnabled() != null) {
+            builder.writableStackTraceEnabled(config.getWritableStackTraceEnabled());
+        }
+        return builder.build();
+    }
+
+    // *******************************
+    // Helpers
+    // *******************************
+
+    Resilience4jConfigurationDefinition buildResilience4jConfiguration(CamelContext camelContext) throws Exception {
+        Map<String, Object> properties = new HashMap<>();
+
+        // Extract properties from default configuration, the one configured on
+        // camel context takes the precedence over those in the registry
+        loadProperties(camelContext, properties, Suppliers.firstNotNull(
+                () -> camelContext.getExtension(Model.class).getResilience4jConfiguration(null),
+                () -> lookup(camelContext, "Camel", Resilience4jConfigurationDefinition.class))
+        );
+
+        // Extract properties from referenced configuration, the one configured
+        // on camel context takes the precedence over those in the registry
+        if (definition.getConfigurationRef() != null) {
+            final String ref = definition.getConfigurationRef();
+
+            loadProperties(camelContext, properties, Suppliers.firstNotNull(
+                    () -> camelContext.getExtension(Model.class).getResilience4jConfiguration(ref),
+                    () -> mandatoryLookup(camelContext, ref, Resilience4jConfigurationDefinition.class))
+            );
+        }
+
+        // Extract properties from local configuration
+        loadProperties(camelContext, properties, Optional.ofNullable(definition.getResilience4jConfiguration()));
+
+        // Extract properties from definition
+        BeanIntrospection beanIntrospection = camelContext.adapt(ExtendedCamelContext.class).getBeanIntrospection();
+        beanIntrospection.getProperties(definition, properties, null, false);
+
+        Resilience4jConfigurationDefinition config = new Resilience4jConfigurationDefinition();
+
+        // Apply properties to a new configuration
+        PropertyBindingSupport.bindProperties(camelContext, config, properties);
+
+        return config;
+    }
 
-        return new ResilienceProcessor(processor, fallback, false);
+    private void loadProperties(CamelContext camelContext, Map<String, Object> properties, Optional<?> optional) {
+        BeanIntrospection beanIntrospection = camelContext.adapt(ExtendedCamelContext.class).getBeanIntrospection();
+        optional.ifPresent(bean -> beanIntrospection.getProperties(bean, properties, null, false));
     }
 
 }