You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/11/18 09:00:44 UTC
[camel] 02/23: CAMEL-13691: camel-resilience4j - WIP
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch CAMEL-13691
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 3693d53fe526f5bcd194ca528e1000aed6d997ad
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sat Nov 16 11:18:36 2019 +0100
CAMEL-13691: camel-resilience4j - WIP
---
apache-camel/pom.xml | 10 ++
apache-camel/src/main/descriptors/common-bin.xml | 2 +
components/camel-resilience4j/pom.xml | 10 ++
.../src/main/docs/resilience4j.adoc | 12 +-
.../resilience4j/ResilienceProcessor.java | 132 ++++++++++++++-------
.../component/resilience4j/ResilienceReifier.java | 4 +
docs/components/modules/ROOT/nav.adoc | 1 +
docs/components/modules/ROOT/pages/index.adoc | 4 +-
.../modules/ROOT/pages}/resilience4j.adoc | 13 +-
parent/pom.xml | 10 ++
10 files changed, 140 insertions(+), 58 deletions(-)
diff --git a/apache-camel/pom.xml b/apache-camel/pom.xml
index d691527..cd3c1b0 100644
--- a/apache-camel/pom.xml
+++ b/apache-camel/pom.xml
@@ -1228,6 +1228,11 @@
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
+ <artifactId>camel-resilience4j</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
<artifactId>camel-rest</artifactId>
<version>${project.version}</version>
</dependency>
@@ -2782,6 +2787,11 @@
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
+ <artifactId>camel-resilience4j-starter</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
<artifactId>camel-rest-starter</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/apache-camel/src/main/descriptors/common-bin.xml b/apache-camel/src/main/descriptors/common-bin.xml
index fe4a531..213ce4f 100644
--- a/apache-camel/src/main/descriptors/common-bin.xml
+++ b/apache-camel/src/main/descriptors/common-bin.xml
@@ -266,6 +266,7 @@
<include>org.apache.camel:camel-reactive-streams</include>
<include>org.apache.camel:camel-reactor</include>
<include>org.apache.camel:camel-ref</include>
+ <include>org.apache.camel:camel-resilience4j</include>
<include>org.apache.camel:camel-rest</include>
<include>org.apache.camel:camel-rest-swagger</include>
<include>org.apache.camel:camel-ribbon</include>
@@ -616,6 +617,7 @@
<include>org.apache.camel:camel-reactive-streams-starter</include>
<include>org.apache.camel:camel-reactor-starter</include>
<include>org.apache.camel:camel-ref-starter</include>
+ <include>org.apache.camel:camel-resilience4j-starter</include>
<include>org.apache.camel:camel-rest-starter</include>
<include>org.apache.camel:camel-rest-swagger-starter</include>
<include>org.apache.camel:camel-ribbon-starter</include>
diff --git a/components/camel-resilience4j/pom.xml b/components/camel-resilience4j/pom.xml
index fa396b4..71f185f 100644
--- a/components/camel-resilience4j/pom.xml
+++ b/components/camel-resilience4j/pom.xml
@@ -50,6 +50,16 @@
<artifactId>resilience4j-circuitbreaker</artifactId>
<version>1.1.0</version>
</dependency>
+ <dependency>
+ <groupId>io.github.resilience4j</groupId>
+ <artifactId>resilience4j-bulkhead</artifactId>
+ <version>1.1.0</version>
+ </dependency>
+ <dependency>
+ <groupId>io.github.resilience4j</groupId>
+ <artifactId>resilience4j-timelimiter</artifactId>
+ <version>1.1.0</version>
+ </dependency>
<!-- for testing -->
<dependency>
diff --git a/components/camel-resilience4j/src/main/docs/resilience4j.adoc b/components/camel-resilience4j/src/main/docs/resilience4j.adoc
index 9afc96e..0bf1b56 100644
--- a/components/camel-resilience4j/src/main/docs/resilience4j.adoc
+++ b/components/camel-resilience4j/src/main/docs/resilience4j.adoc
@@ -1,10 +1,10 @@
-= Hystrix Component
+= Resilience4j Component
-*Since Camel 2.18*
+*Since Camel 3.0*
-The Hystrix component integrates Netflix Hystrix circuit breaker in Camel routes.
+This component supports the Circuit Breaker EIP with the Resilience4j library.
-For more details see the Hystrix EIP documentation.
+For more details see the Circuit Breaker EIP documentation.
Maven users will need to add the following dependency to their `pom.xml`
for this component:
@@ -13,7 +13,7 @@ for this component:
----
<dependency>
<groupId>org.apache.camel</groupId>
- <artifactId>camel-hystrix</artifactId>
+ <artifactId>camel-resilience4j</artifactId>
<version>x.x.x</version>
<!-- use the same version as your Camel core version -->
</dependency>
@@ -28,7 +28,7 @@ When using Spring Boot make sure to use the following Maven dependency to have s
----
<dependency>
<groupId>org.apache.camel</groupId>
- <artifactId>camel-hystrix-starter</artifactId>
+ <artifactId>camel-resilience4j-starter</artifactId>
<version>x.x.x</version>
<!-- use the same version as your Camel core version -->
</dependency>
diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
index a9bd0e7..17be13f 100644
--- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
+++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
@@ -18,7 +18,8 @@ package org.apache.camel.component.resilience4j;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.Callable;
+import java.util.function.Function;
+import java.util.function.Supplier;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.vavr.control.Try;
@@ -26,6 +27,7 @@ import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Navigate;
import org.apache.camel.Processor;
+import org.apache.camel.RuntimeExchangeException;
import org.apache.camel.api.management.ManagedResource;
import org.apache.camel.spi.IdAware;
import org.apache.camel.support.AsyncProcessorSupport;
@@ -90,9 +92,50 @@ public class ResilienceProcessor extends AsyncProcessorSupport implements Naviga
// run this as if we run inside try .. catch so there is no regular Camel error handler
exchange.setProperty(Exchange.TRY_ROUTE_BLOCK, true);
+// Supplier<CompletableFuture<String>> futureSupplier = () -> CompletableFuture.supplyAsync(() -> "Hello");
+// Callable<String> callable = TimeLimiter.decorateFutureSupplier(TimeLimiter.of(Duration.ofMillis(500)), futureSupplier);
+// String result = CircuitBreaker.decorateCheckedSupplier(cb, callable::call).apply();
+
+// Bulkhead bh = Bulkhead.ofDefaults("ddd");
+// BulkheadConfig.
+
+// TimeLimiter time = TimeLimiter.of(Duration.ofSeconds(1));
+// Supplier<Future<Exchange>> task2 = time.decorateFutureSupplier(() -> {
+// task.get();
+// Future
+// });
+
CircuitBreaker cb = CircuitBreaker.ofDefaults(id);
+ Supplier<Exchange> task = CircuitBreaker.decorateSupplier(cb, new CircuitBreakerTask(processor, exchange));
+ Try.ofSupplier(task)
+ .recover(new CircuitBreakerFallbackTask(fallback, exchange))
+ .andFinally(() -> callback.done(false)).get();
+
+ return false;
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ // noop
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ // noop
+ }
+
+ private static class CircuitBreakerTask implements Supplier<Exchange> {
+
+ private final Processor processor;
+ private final Exchange exchange;
+
+ private CircuitBreakerTask(Processor processor, Exchange exchange) {
+ this.processor = processor;
+ this.exchange = exchange;
+ }
- Callable task = CircuitBreaker.decorateCallable(cb, () -> {
+ @Override
+ public Exchange get() {
try {
LOG.debug("Running processor: {} with exchange: {}", processor, exchange);
// prepare a copy of exchange so downstream processors don't cause side-effects if they mutate the exchange
@@ -113,53 +156,52 @@ public class ResilienceProcessor extends AsyncProcessorSupport implements Naviga
}
if (exchange.getException() != null) {
// throw exception so resilient4j know it was a failure
- throw exchange.getException();
+ throw RuntimeExchangeException.wrapRuntimeException(exchange.getException());
}
- return null;
- });
-
- Try.ofCallable(task)
- .recover(f -> {
- if (fallback != null) {
- // store the last to endpoint as the failure endpoint
- if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) {
- exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
- }
- // give the rest of the pipeline another chance
- exchange.setProperty(Exchange.EXCEPTION_HANDLED, true);
- exchange.setProperty(Exchange.EXCEPTION_CAUGHT, exchange.getException());
- exchange.removeProperty(Exchange.ROUTE_STOP);
- exchange.setException(null);
- // and we should not be regarded as exhausted as we are in a try .. catch block
- exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
- // run the fallback processor
- try {
- LOG.debug("Running fallback: {} with exchange: {}", fallback, exchange);
- // process the fallback until its fully done
- fallback.process(exchange);
- LOG.debug("Running fallback: {} with exchange: {} done", fallback, exchange);
- } catch (Exception e) {
- exchange.setException(e);
- }
-
- exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, false);
- exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, true);
- }
- return null;
- })
- .andFinally(() -> callback.done(false)).get();
-
- return false;
+ return exchange;
+ }
}
- @Override
- protected void doStart() throws Exception {
- // noop
- }
+ private static class CircuitBreakerFallbackTask implements Function<Throwable, Exchange> {
- @Override
- protected void doStop() throws Exception {
- // noop
+ private final Processor processor;
+ private final Exchange exchange;
+
+ private CircuitBreakerFallbackTask(Processor processor, Exchange exchange) {
+ this.processor = processor;
+ this.exchange = exchange;
+ }
+
+ @Override
+ public Exchange apply(Throwable throwable) {
+ if (processor != null) {
+ // store the last to endpoint as the failure endpoint
+ if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) {
+ exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
+ }
+ // give the rest of the pipeline another chance
+ exchange.setProperty(Exchange.EXCEPTION_HANDLED, true);
+ exchange.setProperty(Exchange.EXCEPTION_CAUGHT, exchange.getException());
+ exchange.removeProperty(Exchange.ROUTE_STOP);
+ exchange.setException(null);
+ // and we should not be regarded as exhausted as we are in a try .. catch block
+ exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
+ // run the fallback processor
+ try {
+ LOG.debug("Running fallback: {} with exchange: {}", processor, exchange);
+ // process the fallback until its fully done
+ processor.process(exchange);
+ LOG.debug("Running fallback: {} with exchange: {} done", processor, exchange);
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
+
+ exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, false);
+ exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, true);
+ }
+
+ return exchange;
+ }
}
}
diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java
index 1a07f57..4bd1278 100644
--- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java
+++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java
@@ -23,6 +23,10 @@ import org.apache.camel.spi.RouteContext;
public class ResilienceReifier extends ProcessorReifier<CircuitBreakerDefinition> {
+ // TODO: Resilience configuration in camel-core / model
+ // TODO: Timeout
+ // TODO: Bulkhead for viaNetwork
+
public ResilienceReifier(CircuitBreakerDefinition definition) {
super(definition);
}
diff --git a/docs/components/modules/ROOT/nav.adoc b/docs/components/modules/ROOT/nav.adoc
index 4473bc1..d53b85c 100644
--- a/docs/components/modules/ROOT/nav.adoc
+++ b/docs/components/modules/ROOT/nav.adoc
@@ -288,6 +288,7 @@
* xref:reactive-streams-component.adoc[Reactive Streams Component]
* xref:reactor.adoc[Reactor Component]
* xref:ref-component.adoc[Ref Component]
+* xref:resilience4j.adoc[Resilience4j Component]
* xref:rest-swagger-component.adoc[REST Swagger Component]
* xref:rest-api-component.adoc[REST API Component]
* xref:rest-component.adoc[REST Component]
diff --git a/docs/components/modules/ROOT/pages/index.adoc b/docs/components/modules/ROOT/pages/index.adoc
index e80b4f3..0c7a425 100644
--- a/docs/components/modules/ROOT/pages/index.adoc
+++ b/docs/components/modules/ROOT/pages/index.adoc
@@ -772,7 +772,7 @@ Number of Languages: 17 in 11 JAR artifacts (0 deprecated)
== Miscellaneous Components
// others: START
-Number of Miscellaneous Components: 38 in 38 JAR artifacts (0 deprecated)
+Number of Miscellaneous Components: 39 in 39 JAR artifacts (0 deprecated)
[width="100%",cols="4,1,5",options="header"]
|===
@@ -808,6 +808,8 @@ Number of Miscellaneous Components: 38 in 38 JAR artifacts (0 deprecated)
| xref:reactor.adoc[Reactor] (camel-reactor) | 2.20 | Reactor based back-end for Camel's reactive streams component
+| xref:resilience4j.adoc[Resilience4j] (camel-resilience4j) | 3.0 | Circuit Breaker EIP using Resilience4j
+
| xref:ribbon.adoc[Ribbon] (camel-ribbon) | 2.18 | Using Netflix Ribbon for client side load balancing
| xref:rxjava.adoc[RxJava] (camel-rxjava) | 2.22 | RxJava based back-end for Camel's reactive streams component
diff --git a/components/camel-resilience4j/src/main/docs/resilience4j.adoc b/docs/components/modules/ROOT/pages/resilience4j.adoc
similarity index 77%
copy from components/camel-resilience4j/src/main/docs/resilience4j.adoc
copy to docs/components/modules/ROOT/pages/resilience4j.adoc
index 9afc96e..6801015 100644
--- a/components/camel-resilience4j/src/main/docs/resilience4j.adoc
+++ b/docs/components/modules/ROOT/pages/resilience4j.adoc
@@ -1,10 +1,11 @@
-= Hystrix Component
+= Resilience4j Component
+:page-source: components/camel-resilience4j/src/main/docs/resilience4j.adoc
-*Since Camel 2.18*
+*Since Camel 3.0*
-The Hystrix component integrates Netflix Hystrix circuit breaker in Camel routes.
+This component supports the Circuit Breaker EIP with the Resilience4j library.
-For more details see the Hystrix EIP documentation.
+For more details see the Circuit Breaker EIP documentation.
Maven users will need to add the following dependency to their `pom.xml`
for this component:
@@ -13,7 +14,7 @@ for this component:
----
<dependency>
<groupId>org.apache.camel</groupId>
- <artifactId>camel-hystrix</artifactId>
+ <artifactId>camel-resilience4j</artifactId>
<version>x.x.x</version>
<!-- use the same version as your Camel core version -->
</dependency>
@@ -28,7 +29,7 @@ When using Spring Boot make sure to use the following Maven dependency to have s
----
<dependency>
<groupId>org.apache.camel</groupId>
- <artifactId>camel-hystrix-starter</artifactId>
+ <artifactId>camel-resilience4j-starter</artifactId>
<version>x.x.x</version>
<!-- use the same version as your Camel core version -->
</dependency>
diff --git a/parent/pom.xml b/parent/pom.xml
index 6bbc2a4..4c02d65 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -1963,6 +1963,11 @@
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
+ <artifactId>camel-resilience4j</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
<artifactId>camel-rest</artifactId>
<version>${project.version}</version>
</dependency>
@@ -3537,6 +3542,11 @@
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
+ <artifactId>camel-resilience4j-starter</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
<artifactId>camel-rest-starter</artifactId>
<version>${project.version}</version>
</dependency>