You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/11/18 09:00:54 UTC
[camel] 12/23: CAMEL-13691: camel-resilience4j - WIP
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch CAMEL-13691
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 2cf5880c599de47900bcb9dc616b96a64c65dc6f
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Nov 17 12:24:02 2019 +0100
CAMEL-13691: camel-resilience4j - WIP
---
.../resilience4j/ResilienceProcessor.java | 89 +++++++++++---
.../component/resilience4j/ResilienceReifier.java | 26 ++++-
.../resilience4j/HystrixCircuitOpenTest.java | 130 ---------------------
.../ResilienceTimeoutThreadPoolTest.java | 124 ++++++++++++++++++++
.../model/Resilience4jConfigurationCommon.java | 14 +++
.../model/Resilience4jConfigurationDefinition.java | 9 ++
.../main/Resilience4jConfigurationProperties.java | 22 ++++
.../camel-main-configuration-metadata.json | 8 +-
8 files changed, 272 insertions(+), 150 deletions(-)
diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
index b4ab2ae..1b1a039 100644
--- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
+++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
@@ -20,6 +20,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -32,6 +34,8 @@ import io.github.resilience4j.timelimiter.TimeLimiter;
import io.github.resilience4j.timelimiter.TimeLimiterConfig;
import io.vavr.control.Try;
import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
import org.apache.camel.Exchange;
import org.apache.camel.Navigate;
import org.apache.camel.Processor;
@@ -42,6 +46,7 @@ import org.apache.camel.api.management.ManagedResource;
import org.apache.camel.spi.IdAware;
import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.ExchangeHelper;
+import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,17 +54,20 @@ import org.slf4j.LoggerFactory;
* Implementation of Circuit Breaker EIP using resilience4j.
*/
@ManagedResource(description = "Managed Resilience Processor")
-public class ResilienceProcessor extends AsyncProcessorSupport implements Navigate<Processor>, org.apache.camel.Traceable, IdAware {
+public class ResilienceProcessor extends AsyncProcessorSupport implements CamelContextAware, Navigate<Processor>, org.apache.camel.Traceable, IdAware {
private static final Logger LOG = LoggerFactory.getLogger(ResilienceProcessor.class);
private volatile CircuitBreaker circuitBreaker;
+ private CamelContext camelContext;
private String id;
- private CircuitBreakerConfig circuitBreakerConfig;
- private BulkheadConfig bulkheadConfig;
- private TimeLimiterConfig timeLimiterConfig;
+ private final CircuitBreakerConfig circuitBreakerConfig;
+ private final BulkheadConfig bulkheadConfig;
+ private final TimeLimiterConfig timeLimiterConfig;
private final Processor processor;
private final Processor fallback;
+ private boolean shutdownExecutorService;
+ private ExecutorService executorService;
public ResilienceProcessor(CircuitBreakerConfig circuitBreakerConfig, BulkheadConfig bulkheadConfig, TimeLimiterConfig timeLimiterConfig,
Processor processor, Processor fallback) {
@@ -71,6 +79,16 @@ public class ResilienceProcessor extends AsyncProcessorSupport implements Naviga
}
@Override
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ @Override
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
+ @Override
public String getId() {
return id;
}
@@ -80,6 +98,22 @@ public class ResilienceProcessor extends AsyncProcessorSupport implements Naviga
this.id = id;
}
+ public boolean isShutdownExecutorService() {
+ return shutdownExecutorService;
+ }
+
+ public void setShutdownExecutorService(boolean shutdownExecutorService) {
+ this.shutdownExecutorService = shutdownExecutorService;
+ }
+
+ public ExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ public void setExecutorService(ExecutorService executorService) {
+ this.executorService = executorService;
+ }
+
@Override
public String getTraceLabel() {
return "resilience4j";
@@ -314,18 +348,17 @@ public class ResilienceProcessor extends AsyncProcessorSupport implements Naviga
Bulkhead bh = Bulkhead.of(id, bulkheadConfig);
task = Bulkhead.decorateCallable(bh, task);
}
- // timeout handling is more complex with thread-pools
- // TODO: Allow to plugin custom thread-pool instead of JDKs
+
if (timeLimiterConfig != null) {
- final Callable<Exchange> future = task;
- Supplier<CompletableFuture<Exchange>> futureSupplier = () -> CompletableFuture.supplyAsync(() -> {
- try {
- return future.call();
- } catch (Exception e) {
- exchange.setException(e);
- }
- return exchange;
- });
+ // timeout handling is more complex with thread-pools
+ final CircuitBreakerTimeoutTask timeoutTask = new CircuitBreakerTimeoutTask(task, exchange);
+ Supplier<CompletableFuture<Exchange>> futureSupplier;
+ if (executorService == null) {
+ futureSupplier = () -> CompletableFuture.supplyAsync(timeoutTask::get);
+ } else {
+ futureSupplier = () -> CompletableFuture.supplyAsync(timeoutTask::get, executorService);
+ }
+
TimeLimiter tl = TimeLimiter.of(id, timeLimiterConfig);
task = TimeLimiter.decorateFutureSupplier(tl, futureSupplier);
}
@@ -339,12 +372,15 @@ public class ResilienceProcessor extends AsyncProcessorSupport implements Naviga
@Override
protected void doStart() throws Exception {
+ ObjectHelper.notNull(camelContext, "CamelContext", this);
circuitBreaker = CircuitBreaker.of(id, circuitBreakerConfig);
}
@Override
protected void doStop() throws Exception {
- // noop
+ if (shutdownExecutorService && executorService != null) {
+ getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
+ }
}
private static class CircuitBreakerTask implements Callable<Exchange> {
@@ -441,4 +477,25 @@ public class ResilienceProcessor extends AsyncProcessorSupport implements Naviga
}
}
+ private static class CircuitBreakerTimeoutTask implements Supplier<Exchange> {
+
+ private final Callable<Exchange> future;
+ private final Exchange exchange;
+
+ private CircuitBreakerTimeoutTask(Callable<Exchange> future, Exchange exchange) {
+ this.future = future;
+ this.exchange = exchange;
+ }
+
+ @Override
+ public Exchange get() {
+ try {
+ return future.call();
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
+ return exchange;
+ }
+ }
+
}
diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java
index da157fd..e9aa90c 100644
--- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java
+++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java
@@ -20,6 +20,7 @@ import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.ExecutorService;
import io.github.resilience4j.bulkhead.BulkheadConfig;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
@@ -30,6 +31,7 @@ import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.Processor;
import org.apache.camel.model.CircuitBreakerDefinition;
import org.apache.camel.model.Model;
+import org.apache.camel.model.ProcessorDefinitionHelper;
import org.apache.camel.model.Resilience4jConfigurationCommon;
import org.apache.camel.model.Resilience4jConfigurationDefinition;
import org.apache.camel.reifier.ProcessorReifier;
@@ -43,8 +45,6 @@ import static org.apache.camel.support.CamelContextHelper.mandatoryLookup;
public class ResilienceReifier extends ProcessorReifier<CircuitBreakerDefinition> {
- // TODO: thread pool bulkhead
- // TODO: Configure timeout thread-pool globally
// TODO: spring-boot allow to configure via resilience4j-spring-boot
// TODO: example
// TODO: camel-main - configure hystrix/resilience/rest via java code fluent builder (does it work)
@@ -70,7 +70,9 @@ public class ResilienceReifier extends ProcessorReifier<CircuitBreakerDefinition
BulkheadConfig bhConfig = configureBulkHead(config);
TimeLimiterConfig tlConfig = configureTimeLimiter(config);
- return new ResilienceProcessor(cbConfig, bhConfig, tlConfig, processor, fallback);
+ ResilienceProcessor answer = new ResilienceProcessor(cbConfig, bhConfig, tlConfig, processor, fallback);
+ configureTimeoutExecutorService(answer, routeContext, config);
+ return answer;
}
private CircuitBreakerConfig configureCircuitBreaker(Resilience4jConfigurationCommon config) {
@@ -138,6 +140,24 @@ public class ResilienceReifier extends ProcessorReifier<CircuitBreakerDefinition
return builder.build();
}
+ private void configureTimeoutExecutorService(ResilienceProcessor processor, RouteContext routeContext, Resilience4jConfigurationCommon config) {
+ if (config.getTimeoutEnabled() == null || !config.getTimeoutEnabled()) {
+ return;
+ }
+
+ if (config.getTimeoutExecutorServiceRef() != null) {
+ String ref = config.getTimeoutExecutorServiceRef();
+ boolean shutdownThreadPool = false;
+ ExecutorService executorService = routeContext.lookup(ref, ExecutorService.class);
+ if (executorService == null) {
+ executorService = ProcessorDefinitionHelper.lookupExecutorServiceRef(routeContext, "CircuitBreaker", definition, ref);
+ shutdownThreadPool = true;
+ }
+ processor.setExecutorService(executorService);
+ processor.setShutdownExecutorService(shutdownThreadPool);
+ }
+ }
+
// *******************************
// Helpers
// *******************************
diff --git a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/HystrixCircuitOpenTest.java b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/HystrixCircuitOpenTest.java
deleted file mode 100644
index e9e7bcf..0000000
--- a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/HystrixCircuitOpenTest.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.resilience4j;
-
-import java.io.IOException;
-
-import org.apache.camel.CamelExecutionException;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.RoutesBuilder;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.test.junit4.CamelTestSupport;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.camel.component.resilience4j.CircuitBreakerConstants.RESPONSE_SHORT_CIRCUITED;
-import static org.apache.camel.component.resilience4j.CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION;
-
-@Ignore
-public class HystrixCircuitOpenTest extends CamelTestSupport {
- public static final Integer REQUEST_VOLUME_THRESHOLD = 4;
- private static final Logger LOG = LoggerFactory.getLogger(HystrixCircuitOpenTest.class);
-
- private HystrixExceptionRoute route = new HystrixExceptionRoute();
-
- @Test
- public void testCircuitOpen() throws Exception {
- LOG.info("testCircuitOpen start");
- // failing requests
- route.throwException = true;
- for (int i = 0; i < 2 * REQUEST_VOLUME_THRESHOLD; i++) {
- try {
- template.asyncRequestBody("direct:start", "Request Body");
- } catch (CamelExecutionException e) {
- LOG.info(e.toString());
- }
- }
- Thread.sleep(1500);
-
- resetMocks();
-
- // notice this can be flaky due timing when using thread sleeps in unit tests
- getMockEndpoint("mock:result").expectedPropertyReceived(RESPONSE_SHORT_CIRCUITED, true);
-
- route.throwException = false;
- try {
- template.requestBody("direct:start", "Request Body");
- LOG.info("Instead circuit open expected");
- } catch (CamelExecutionException e) {
- LOG.info("Circuit open expected ", e);
- }
-
- assertMockEndpointsSatisfied();
-
- // wait for the circuit to try an other request
- Thread.sleep(500);
- for (int i = 0; i < 2 * REQUEST_VOLUME_THRESHOLD; i++) {
- try {
- template.requestBody("direct:start", "Request Body");
- LOG.info("Circuit has closed");
- } catch (CamelExecutionException e) {
- Thread.sleep(i * 100);
- LOG.info("Circuit will be closed soon " + e.toString());
- }
- }
-
- resetMocks();
-
- getMockEndpoint("mock:result").expectedPropertyReceived(RESPONSE_SHORT_CIRCUITED, false);
- getMockEndpoint("mock:result").expectedPropertyReceived(RESPONSE_SUCCESSFUL_EXECUTION, true);
-
- template.requestBody("direct:start", "Request Body");
-
- assertMockEndpointsSatisfied();
- }
-
- @Override
- protected RoutesBuilder createRouteBuilder() throws Exception {
- return route;
- }
-
- class HystrixExceptionRoute extends RouteBuilder {
- volatile boolean throwException = true;
-
- @Override
- public void configure() throws Exception {
- from("direct:start")
- .circuitBreaker()
- .hystrixConfiguration()
- .executionTimeoutInMilliseconds(100)
- .circuitBreakerRequestVolumeThreshold(REQUEST_VOLUME_THRESHOLD)
- .metricsRollingStatisticalWindowInMilliseconds(1000)
- .circuitBreakerSleepWindowInMilliseconds(2000)
- .end()
- .log("Hystrix processing start: ${threadName}")
- .process(new Processor() {
- @Override
- public void process(Exchange exchange) throws Exception {
- if (throwException) {
- LOG.info("Will throw exception");
- throw new IOException("Route has failed");
- } else {
- LOG.info("Will NOT throw exception");
- }
- }
- })
- .log("Hystrix processing end: ${threadName}")
- .end()
- .log(RESPONSE_SHORT_CIRCUITED + " = ${exchangeProperty." + RESPONSE_SHORT_CIRCUITED + "}")
- .to("mock:result");
- }
- }
-}
-
diff --git a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceTimeoutThreadPoolTest.java b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceTimeoutThreadPoolTest.java
new file mode 100644
index 0000000..58e0c33
--- /dev/null
+++ b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceTimeoutThreadPoolTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.resilience4j;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * Resilience using timeout and custom thread pool with Java DSL
+ */
+public class ResilienceTimeoutThreadPoolTest extends CamelTestSupport {
+
+ @BindToRegistry
+ public ExecutorService myThreadPool() {
+ return context().getExecutorServiceManager().newFixedThreadPool(this, "myThreadPool", 2);
+ }
+
+ @Test
+ public void testFast() throws Exception {
+ // this calls the fast route and therefore we get a response
+ Object out = template.requestBody("direct:start", "fast");
+ assertEquals("Fast response", out);
+
+
+ ThreadPoolExecutor pte = context().getRegistry().lookupByNameAndType("myThreadPool", ThreadPoolExecutor.class);
+ assertNotNull(pte);
+ assertEquals(2, pte.getCorePoolSize());
+ assertEquals(1, pte.getCompletedTaskCount());
+
+ assertFalse(pte.isShutdown());
+ }
+
+ @Test
+ public void testSlow() throws Exception {
+ // this calls the slow route and therefore causes a timeout which triggers an exception
+ try {
+ template.requestBody("direct:start", "slow");
+ fail("Should fail due to timeout");
+ } catch (Exception e) {
+ // expected a timeout
+ assertIsInstanceOf(TimeoutException.class, e.getCause());
+ }
+
+ ThreadPoolExecutor pte = context().getRegistry().lookupByNameAndType("myThreadPool", ThreadPoolExecutor.class);
+ assertNotNull(pte);
+ assertEquals(2, pte.getCorePoolSize());
+ assertEquals(0, pte.getCompletedTaskCount());
+ assertEquals(1, pte.getActiveCount());
+
+ // stop camel and thread pool is also stopped
+ context().stop();
+
+ assertTrue(pte.isShutdown());
+ }
+
+ @Test
+ public void testSlowLoop() throws Exception {
+ // this calls the slow route and therefore causes a timeout which triggers an exception
+ for (int i = 0; i < 10; i++) {
+ try {
+ log.info(">>> test run " + i + " <<<");
+ template.requestBody("direct:start", "slow");
+ fail("Should fail due to timeout");
+ } catch (Exception e) {
+ // expected a timeout
+ assertIsInstanceOf(TimeoutException.class, e.getCause());
+ }
+ }
+ }
+
+ @Override
+ protected RoutesBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .circuitBreaker()
+ // enable and use 2 second timeout
+ .resilience4jConfiguration().timeoutEnabled(true).timeoutDuration(2000).timeoutExecutorServiceRef("myThreadPool").end()
+ .log("Resilience processing start: ${threadName}")
+ .toD("direct:${body}")
+ .log("Resilience processing end: ${threadName}")
+ .end()
+ .log("After Resilience ${body}");
+
+ from("direct:fast")
+ // this is a fast route and takes 1 second to respond
+ .log("Fast processing start: ${threadName}")
+ .delay(1000)
+ .transform().constant("Fast response")
+ .log("Fast processing end: ${threadName}");
+
+ from("direct:slow")
+ // this is a slow route and takes 3 second to respond
+ .log("Slow processing start: ${threadName}")
+ .delay(3000)
+ .transform().constant("Slow response")
+ .log("Slow processing end: ${threadName}");
+ }
+ };
+ }
+
+}
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/model/Resilience4jConfigurationCommon.java b/core/camel-core-engine/src/main/java/org/apache/camel/model/Resilience4jConfigurationCommon.java
index 208f8b0..decdd34 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/model/Resilience4jConfigurationCommon.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/model/Resilience4jConfigurationCommon.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.model;
+import java.util.concurrent.ForkJoinPool;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlAttribute;
@@ -66,6 +67,8 @@ public class Resilience4jConfigurationCommon extends IdentifiedType {
private Integer bulkheadMaxWaitDuration;
@Metadata(label = "timeout", defaultValue = "false")
private Boolean timeoutEnabled;
+ @Metadata(label = "timeout")
+ private String timeoutExecutorServiceRef;
@Metadata(label = "timeout", defaultValue = "1000")
private Integer timeoutDuration;
@Metadata(label = "timeout", defaultValue = "true")
@@ -279,6 +282,17 @@ public class Resilience4jConfigurationCommon extends IdentifiedType {
this.timeoutEnabled = timeoutEnabled;
}
+ public String getTimeoutExecutorServiceRef() {
+ return timeoutExecutorServiceRef;
+ }
+
+ /**
+ * References to a custom thread pool to use when timeout is enabled (uses {@link ForkJoinPool#commonPool()} by default)
+ */
+ public void setTimeoutExecutorServiceRef(String timeoutExecutorServiceRef) {
+ this.timeoutExecutorServiceRef = timeoutExecutorServiceRef;
+ }
+
public Integer getTimeoutDuration() {
return timeoutDuration;
}
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/model/Resilience4jConfigurationDefinition.java b/core/camel-core-engine/src/main/java/org/apache/camel/model/Resilience4jConfigurationDefinition.java
index 76eadb0..54d4f0d 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/model/Resilience4jConfigurationDefinition.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/model/Resilience4jConfigurationDefinition.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.model;
+import java.util.concurrent.ForkJoinPool;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
@@ -205,6 +206,14 @@ public class Resilience4jConfigurationDefinition extends Resilience4jConfigurati
}
/**
+ * References to a custom thread pool to use when timeout is enabled (uses {@link ForkJoinPool#commonPool()} by default)
+ */
+ public Resilience4jConfigurationDefinition timeoutExecutorServiceRef(String executorServiceRef) {
+ setTimeoutExecutorServiceRef(executorServiceRef);
+ return this;
+ }
+
+ /**
* Configures the thread execution timeout (millis).
* Default value is 1000 millis (1 second).
*/
diff --git a/core/camel-main/src/main/java/org/apache/camel/main/Resilience4jConfigurationProperties.java b/core/camel-main/src/main/java/org/apache/camel/main/Resilience4jConfigurationProperties.java
index 81c1ca9..9292287 100644
--- a/core/camel-main/src/main/java/org/apache/camel/main/Resilience4jConfigurationProperties.java
+++ b/core/camel-main/src/main/java/org/apache/camel/main/Resilience4jConfigurationProperties.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.main;
+import java.util.concurrent.ForkJoinPool;
+
/**
* Global configuration for Resilience EIP circuit breaker.
*/
@@ -38,6 +40,7 @@ public class Resilience4jConfigurationProperties {
private Integer bulkheadMaxConcurrentCalls;
private Integer bulkheadMaxWaitDuration;
private Boolean timeoutEnabled;
+ private String timeoutExecutorServiceRef;
private Integer timeoutDuration;
private Boolean timeoutCancelRunningFuture;
@@ -256,6 +259,17 @@ public class Resilience4jConfigurationProperties {
this.timeoutEnabled = timeoutEnabled;
}
+ public String getTimeoutExecutorServiceRef() {
+ return timeoutExecutorServiceRef;
+ }
+
+ /**
+ * References to a custom thread pool to use when timeout is enabled (uses {@link ForkJoinPool#commonPool()} by default)
+ */
+ public void setTimeoutExecutorServiceRef(String timeoutExecutorServiceRef) {
+ this.timeoutExecutorServiceRef = timeoutExecutorServiceRef;
+ }
+
public Integer getTimeoutDuration() {
return timeoutDuration;
}
@@ -437,6 +451,14 @@ public class Resilience4jConfigurationProperties {
}
/**
+ * References to a custom thread pool to use when timeout is enabled (uses {@link ForkJoinPool#commonPool()} by default)
+ */
+ public Resilience4jConfigurationProperties withTimeoutExecutorServiceRef(String timeoutExecutorServiceRef) {
+ this.timeoutExecutorServiceRef = timeoutExecutorServiceRef;
+ return this;
+ }
+
+ /**
* Configures the thread execution timeout (millis).
* Default value is 1000 millis (1 second).
*/
diff --git a/core/camel-main/src/main/resources/META-INF/camel-main-configuration-metadata.json b/core/camel-main/src/main/resources/META-INF/camel-main-configuration-metadata.json
index 6aac40d..983efdb 100644
--- a/core/camel-main/src/main/resources/META-INF/camel-main-configuration-metadata.json
+++ b/core/camel-main/src/main/resources/META-INF/camel-main-configuration-metadata.json
@@ -703,7 +703,7 @@
"name":"camel.resilience4j.timeout-duration",
"type":"java.lang.Integer",
"sourceType":"org.apache.camel.main.Resilience4jConfigurationProperties",
- "description":"Configures the thread execution timeout. Default value is 1 second."
+ "description":"Configures the thread execution timeout (millis). Default value is 1000 millis (1 second)."
},
{
"name":"camel.resilience4j.timeout-enabled",
@@ -712,6 +712,12 @@
"description":"Whether timeout is enabled or not on the circuit breaker. Default is false."
},
{
+ "name":"camel.resilience4j.timeout-executor-service-ref",
+ "type":"java.lang.String",
+ "sourceType":"org.apache.camel.main.Resilience4jConfigurationProperties",
+ "description":"References to a custom thread pool to use when timeout is enabled (uses ForkJoinPool#commonPool() by default)"
+ },
+ {
"name":"camel.resilience4j.wait-duration-in-open-state",
"type":"java.lang.Integer",
"sourceType":"org.apache.camel.main.Resilience4jConfigurationProperties",