You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/11/18 09:00:44 UTC

[camel] 02/23: CAMEL-13691: camel-resilience4j - WIP

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch CAMEL-13691
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 3693d53fe526f5bcd194ca528e1000aed6d997ad
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sat Nov 16 11:18:36 2019 +0100

    CAMEL-13691: camel-resilience4j - WIP
---
 apache-camel/pom.xml                               |  10 ++
 apache-camel/src/main/descriptors/common-bin.xml   |   2 +
 components/camel-resilience4j/pom.xml              |  10 ++
 .../src/main/docs/resilience4j.adoc                |  12 +-
 .../resilience4j/ResilienceProcessor.java          | 132 ++++++++++++++-------
 .../component/resilience4j/ResilienceReifier.java  |   4 +
 docs/components/modules/ROOT/nav.adoc              |   1 +
 docs/components/modules/ROOT/pages/index.adoc      |   4 +-
 .../modules/ROOT/pages}/resilience4j.adoc          |  13 +-
 parent/pom.xml                                     |  10 ++
 10 files changed, 140 insertions(+), 58 deletions(-)

diff --git a/apache-camel/pom.xml b/apache-camel/pom.xml
index d691527..cd3c1b0 100644
--- a/apache-camel/pom.xml
+++ b/apache-camel/pom.xml
@@ -1228,6 +1228,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.camel</groupId>
+      <artifactId>camel-resilience4j</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
       <artifactId>camel-rest</artifactId>
       <version>${project.version}</version>
     </dependency>
@@ -2782,6 +2787,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.camel</groupId>
+      <artifactId>camel-resilience4j-starter</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
       <artifactId>camel-rest-starter</artifactId>
       <version>${project.version}</version>
     </dependency>
diff --git a/apache-camel/src/main/descriptors/common-bin.xml b/apache-camel/src/main/descriptors/common-bin.xml
index fe4a531..213ce4f 100644
--- a/apache-camel/src/main/descriptors/common-bin.xml
+++ b/apache-camel/src/main/descriptors/common-bin.xml
@@ -266,6 +266,7 @@
         <include>org.apache.camel:camel-reactive-streams</include>
         <include>org.apache.camel:camel-reactor</include>
         <include>org.apache.camel:camel-ref</include>
+        <include>org.apache.camel:camel-resilience4j</include>
         <include>org.apache.camel:camel-rest</include>
         <include>org.apache.camel:camel-rest-swagger</include>
         <include>org.apache.camel:camel-ribbon</include>
@@ -616,6 +617,7 @@
         <include>org.apache.camel:camel-reactive-streams-starter</include>
         <include>org.apache.camel:camel-reactor-starter</include>
         <include>org.apache.camel:camel-ref-starter</include>
+        <include>org.apache.camel:camel-resilience4j-starter</include>
         <include>org.apache.camel:camel-rest-starter</include>
         <include>org.apache.camel:camel-rest-swagger-starter</include>
         <include>org.apache.camel:camel-ribbon-starter</include>
diff --git a/components/camel-resilience4j/pom.xml b/components/camel-resilience4j/pom.xml
index fa396b4..71f185f 100644
--- a/components/camel-resilience4j/pom.xml
+++ b/components/camel-resilience4j/pom.xml
@@ -50,6 +50,16 @@
             <artifactId>resilience4j-circuitbreaker</artifactId>
             <version>1.1.0</version>
         </dependency>
+        <dependency>
+            <groupId>io.github.resilience4j</groupId>
+            <artifactId>resilience4j-bulkhead</artifactId>
+            <version>1.1.0</version>
+        </dependency>
+        <dependency>
+            <groupId>io.github.resilience4j</groupId>
+            <artifactId>resilience4j-timelimiter</artifactId>
+            <version>1.1.0</version>
+        </dependency>
 
         <!-- for testing -->
         <dependency>
diff --git a/components/camel-resilience4j/src/main/docs/resilience4j.adoc b/components/camel-resilience4j/src/main/docs/resilience4j.adoc
index 9afc96e..0bf1b56 100644
--- a/components/camel-resilience4j/src/main/docs/resilience4j.adoc
+++ b/components/camel-resilience4j/src/main/docs/resilience4j.adoc
@@ -1,10 +1,10 @@
-= Hystrix Component
+= Resilience4j Component
 
-*Since Camel 2.18*
+*Since Camel 3.0*
 
-The Hystrix component integrates Netflix Hystrix circuit breaker in Camel routes.
+This component supports the Circuit Breaker EIP with the Resilience4j library.
 
-For more details see the Hystrix EIP documentation.
+For more details see the Circuit Breaker EIP documentation.
 
 Maven users will need to add the following dependency to their `pom.xml`
 for this component:
@@ -13,7 +13,7 @@ for this component:
 ----
 <dependency>
     <groupId>org.apache.camel</groupId>
-    <artifactId>camel-hystrix</artifactId>
+    <artifactId>camel-resilience4j</artifactId>
     <version>x.x.x</version>
     <!-- use the same version as your Camel core version -->
 </dependency>
@@ -28,7 +28,7 @@ When using Spring Boot make sure to use the following Maven dependency to have s
 ----
 <dependency>
   <groupId>org.apache.camel</groupId>
-  <artifactId>camel-hystrix-starter</artifactId>
+  <artifactId>camel-resilience4j-starter</artifactId>
   <version>x.x.x</version>
   <!-- use the same version as your Camel core version -->
 </dependency>
diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
index a9bd0e7..17be13f 100644
--- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
+++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
@@ -18,7 +18,8 @@ package org.apache.camel.component.resilience4j;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.Callable;
+import java.util.function.Function;
+import java.util.function.Supplier;
 
 import io.github.resilience4j.circuitbreaker.CircuitBreaker;
 import io.vavr.control.Try;
@@ -26,6 +27,7 @@ import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
+import org.apache.camel.RuntimeExchangeException;
 import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.support.AsyncProcessorSupport;
@@ -90,9 +92,50 @@ public class ResilienceProcessor extends AsyncProcessorSupport implements Naviga
         // run this as if we run inside try .. catch so there is no regular Camel error handler
         exchange.setProperty(Exchange.TRY_ROUTE_BLOCK, true);
 
+//        Supplier<CompletableFuture<String>> futureSupplier = () -> CompletableFuture.supplyAsync(() -> "Hello");
+//        Callable<String> callable = TimeLimiter.decorateFutureSupplier(TimeLimiter.of(Duration.ofMillis(500)), futureSupplier);
+//        String result = CircuitBreaker.decorateCheckedSupplier(cb, callable::call).apply();
+
+//        Bulkhead bh = Bulkhead.ofDefaults("ddd");
+//        BulkheadConfig.
+
+//                TimeLimiter time = TimeLimiter.of(Duration.ofSeconds(1));
+//        Supplier<Future<Exchange>> task2 = time.decorateFutureSupplier(() -> {
+//            task.get();
+//            Future
+//        });
+
         CircuitBreaker cb = CircuitBreaker.ofDefaults(id);
+        Supplier<Exchange> task = CircuitBreaker.decorateSupplier(cb, new CircuitBreakerTask(processor, exchange));
+        Try.ofSupplier(task)
+                .recover(new CircuitBreakerFallbackTask(fallback, exchange))
+                .andFinally(() -> callback.done(false)).get();
+
+        return false;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        // noop
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        // noop
+    }
+
+    private static class CircuitBreakerTask implements Supplier<Exchange> {
+
+        private final Processor processor;
+        private final Exchange exchange;
+
+        private CircuitBreakerTask(Processor processor, Exchange exchange) {
+            this.processor = processor;
+            this.exchange = exchange;
+        }
 
-        Callable task = CircuitBreaker.decorateCallable(cb, () -> {
+        @Override
+        public Exchange get() {
             try {
                 LOG.debug("Running processor: {} with exchange: {}", processor, exchange);
                 // prepare a copy of exchange so downstream processors don't cause side-effects if they mutate the exchange
@@ -113,53 +156,52 @@ public class ResilienceProcessor extends AsyncProcessorSupport implements Naviga
             }
             if (exchange.getException() != null) {
                 // throw exception so resilient4j know it was a failure
-                throw exchange.getException();
+                throw RuntimeExchangeException.wrapRuntimeException(exchange.getException());
             }
-            return null;
-        });
-
-        Try.ofCallable(task)
-                .recover(f -> {
-                    if (fallback != null) {
-                        // store the last to endpoint as the failure endpoint
-                        if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) {
-                            exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
-                        }
-                        // give the rest of the pipeline another chance
-                        exchange.setProperty(Exchange.EXCEPTION_HANDLED, true);
-                        exchange.setProperty(Exchange.EXCEPTION_CAUGHT, exchange.getException());
-                        exchange.removeProperty(Exchange.ROUTE_STOP);
-                        exchange.setException(null);
-                        // and we should not be regarded as exhausted as we are in a try .. catch block
-                        exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
-                        // run the fallback processor
-                        try {
-                            LOG.debug("Running fallback: {} with exchange: {}", fallback, exchange);
-                            // process the fallback until its fully done
-                            fallback.process(exchange);
-                            LOG.debug("Running fallback: {} with exchange: {} done", fallback, exchange);
-                        } catch (Exception e) {
-                            exchange.setException(e);
-                        }
-
-                        exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, false);
-                        exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, true);
-                    }
-                    return null;
-                })
-                .andFinally(() -> callback.done(false)).get();
-
-        return false;
+            return exchange;
+        }
     }
 
-    @Override
-    protected void doStart() throws Exception {
-        // noop
-    }
+    private static class CircuitBreakerFallbackTask implements Function<Throwable, Exchange> {
 
-    @Override
-    protected void doStop() throws Exception {
-        // noop
+        private final Processor processor;
+        private final Exchange exchange;
+
+        private CircuitBreakerFallbackTask(Processor processor, Exchange exchange) {
+            this.processor = processor;
+            this.exchange = exchange;
+        }
+
+        @Override
+        public Exchange apply(Throwable throwable) {
+            if (processor != null) {
+                // store the last to endpoint as the failure endpoint
+                if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) {
+                    exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
+                }
+                // give the rest of the pipeline another chance
+                exchange.setProperty(Exchange.EXCEPTION_HANDLED, true);
+                exchange.setProperty(Exchange.EXCEPTION_CAUGHT, exchange.getException());
+                exchange.removeProperty(Exchange.ROUTE_STOP);
+                exchange.setException(null);
+                // and we should not be regarded as exhausted as we are in a try .. catch block
+                exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
+                // run the fallback processor
+                try {
+                    LOG.debug("Running fallback: {} with exchange: {}", processor, exchange);
+                    // process the fallback until its fully done
+                    processor.process(exchange);
+                    LOG.debug("Running fallback: {} with exchange: {} done", processor, exchange);
+                } catch (Exception e) {
+                    exchange.setException(e);
+                }
+
+                exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, false);
+                exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, true);
+            }
+
+            return exchange;
+        }
     }
 
 }
diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java
index 1a07f57..4bd1278 100644
--- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java
+++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java
@@ -23,6 +23,10 @@ import org.apache.camel.spi.RouteContext;
 
 public class ResilienceReifier extends ProcessorReifier<CircuitBreakerDefinition> {
 
+    // TODO: Resilience configuration in camel-core / model
+    // TODO: Timeout
+    // TODO: Bulkhead for viaNetwork
+
     public ResilienceReifier(CircuitBreakerDefinition definition) {
         super(definition);
     }
diff --git a/docs/components/modules/ROOT/nav.adoc b/docs/components/modules/ROOT/nav.adoc
index 4473bc1..d53b85c 100644
--- a/docs/components/modules/ROOT/nav.adoc
+++ b/docs/components/modules/ROOT/nav.adoc
@@ -288,6 +288,7 @@
 * xref:reactive-streams-component.adoc[Reactive Streams Component]
 * xref:reactor.adoc[Reactor Component]
 * xref:ref-component.adoc[Ref Component]
+* xref:resilience4j.adoc[Resilience4j Component]
 * xref:rest-swagger-component.adoc[REST Swagger Component]
 * xref:rest-api-component.adoc[REST API Component]
 * xref:rest-component.adoc[REST Component]
diff --git a/docs/components/modules/ROOT/pages/index.adoc b/docs/components/modules/ROOT/pages/index.adoc
index e80b4f3..0c7a425 100644
--- a/docs/components/modules/ROOT/pages/index.adoc
+++ b/docs/components/modules/ROOT/pages/index.adoc
@@ -772,7 +772,7 @@ Number of Languages: 17 in 11 JAR artifacts (0 deprecated)
 == Miscellaneous Components
 
 // others: START
-Number of Miscellaneous Components: 38 in 38 JAR artifacts (0 deprecated)
+Number of Miscellaneous Components: 39 in 39 JAR artifacts (0 deprecated)
 
 [width="100%",cols="4,1,5",options="header"]
 |===
@@ -808,6 +808,8 @@ Number of Miscellaneous Components: 38 in 38 JAR artifacts (0 deprecated)
 
 | xref:reactor.adoc[Reactor] (camel-reactor) | 2.20 | Reactor based back-end for Camel's reactive streams component
 
+| xref:resilience4j.adoc[Resilience4j] (camel-resilience4j) | 3.0 | Circuit Breaker EIP using Resilience4j
+
 | xref:ribbon.adoc[Ribbon] (camel-ribbon) | 2.18 | Using Netflix Ribbon for client side load balancing
 
 | xref:rxjava.adoc[RxJava] (camel-rxjava) | 2.22 | RxJava based back-end for Camel's reactive streams component
diff --git a/components/camel-resilience4j/src/main/docs/resilience4j.adoc b/docs/components/modules/ROOT/pages/resilience4j.adoc
similarity index 77%
copy from components/camel-resilience4j/src/main/docs/resilience4j.adoc
copy to docs/components/modules/ROOT/pages/resilience4j.adoc
index 9afc96e..6801015 100644
--- a/components/camel-resilience4j/src/main/docs/resilience4j.adoc
+++ b/docs/components/modules/ROOT/pages/resilience4j.adoc
@@ -1,10 +1,11 @@
-= Hystrix Component
+= Resilience4j Component
+:page-source: components/camel-resilience4j/src/main/docs/resilience4j.adoc
 
-*Since Camel 2.18*
+*Since Camel 3.0*
 
-The Hystrix component integrates Netflix Hystrix circuit breaker in Camel routes.
+This component supports the Circuit Breaker EIP with the Resilience4j library.
 
-For more details see the Hystrix EIP documentation.
+For more details see the Circuit Breaker EIP documentation.
 
 Maven users will need to add the following dependency to their `pom.xml`
 for this component:
@@ -13,7 +14,7 @@ for this component:
 ----
 <dependency>
     <groupId>org.apache.camel</groupId>
-    <artifactId>camel-hystrix</artifactId>
+    <artifactId>camel-resilience4j</artifactId>
     <version>x.x.x</version>
     <!-- use the same version as your Camel core version -->
 </dependency>
@@ -28,7 +29,7 @@ When using Spring Boot make sure to use the following Maven dependency to have s
 ----
 <dependency>
   <groupId>org.apache.camel</groupId>
-  <artifactId>camel-hystrix-starter</artifactId>
+  <artifactId>camel-resilience4j-starter</artifactId>
   <version>x.x.x</version>
   <!-- use the same version as your Camel core version -->
 </dependency>
diff --git a/parent/pom.xml b/parent/pom.xml
index 6bbc2a4..4c02d65 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -1963,6 +1963,11 @@
       </dependency>
       <dependency>
         <groupId>org.apache.camel</groupId>
+        <artifactId>camel-resilience4j</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.camel</groupId>
         <artifactId>camel-rest</artifactId>
         <version>${project.version}</version>
       </dependency>
@@ -3537,6 +3542,11 @@
       </dependency>
       <dependency>
         <groupId>org.apache.camel</groupId>
+        <artifactId>camel-resilience4j-starter</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.camel</groupId>
         <artifactId>camel-rest-starter</artifactId>
         <version>${project.version}</version>
       </dependency>