You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/11/18 09:00:54 UTC

[camel] 12/23: CAMEL-13691: camel-resilience4j - WIP

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch CAMEL-13691
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 2cf5880c599de47900bcb9dc616b96a64c65dc6f
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Nov 17 12:24:02 2019 +0100

    CAMEL-13691: camel-resilience4j - WIP
---
 .../resilience4j/ResilienceProcessor.java          |  89 +++++++++++---
 .../component/resilience4j/ResilienceReifier.java  |  26 ++++-
 .../resilience4j/HystrixCircuitOpenTest.java       | 130 ---------------------
 .../ResilienceTimeoutThreadPoolTest.java           | 124 ++++++++++++++++++++
 .../model/Resilience4jConfigurationCommon.java     |  14 +++
 .../model/Resilience4jConfigurationDefinition.java |   9 ++
 .../main/Resilience4jConfigurationProperties.java  |  22 ++++
 .../camel-main-configuration-metadata.json         |   8 +-
 8 files changed, 272 insertions(+), 150 deletions(-)

diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
index b4ab2ae..1b1a039 100644
--- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
+++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
@@ -20,6 +20,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -32,6 +34,8 @@ import io.github.resilience4j.timelimiter.TimeLimiter;
 import io.github.resilience4j.timelimiter.TimeLimiterConfig;
 import io.vavr.control.Try;
 import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
 import org.apache.camel.Exchange;
 import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
@@ -42,6 +46,7 @@ import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.support.AsyncProcessorSupport;
 import org.apache.camel.support.ExchangeHelper;
+import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,17 +54,20 @@ import org.slf4j.LoggerFactory;
  * Implementation of Circuit Breaker EIP using resilience4j.
  */
 @ManagedResource(description = "Managed Resilience Processor")
-public class ResilienceProcessor extends AsyncProcessorSupport implements Navigate<Processor>, org.apache.camel.Traceable, IdAware {
+public class ResilienceProcessor extends AsyncProcessorSupport implements CamelContextAware, Navigate<Processor>, org.apache.camel.Traceable, IdAware {
 
     private static final Logger LOG = LoggerFactory.getLogger(ResilienceProcessor.class);
 
     private volatile CircuitBreaker circuitBreaker;
+    private CamelContext camelContext;
     private String id;
-    private CircuitBreakerConfig circuitBreakerConfig;
-    private BulkheadConfig bulkheadConfig;
-    private TimeLimiterConfig timeLimiterConfig;
+    private final CircuitBreakerConfig circuitBreakerConfig;
+    private final BulkheadConfig bulkheadConfig;
+    private final TimeLimiterConfig timeLimiterConfig;
     private final Processor processor;
     private final Processor fallback;
+    private boolean shutdownExecutorService;
+    private ExecutorService executorService;
 
     public ResilienceProcessor(CircuitBreakerConfig circuitBreakerConfig, BulkheadConfig bulkheadConfig, TimeLimiterConfig timeLimiterConfig,
                                Processor processor, Processor fallback) {
@@ -71,6 +79,16 @@ public class ResilienceProcessor extends AsyncProcessorSupport implements Naviga
     }
 
     @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    @Override
     public String getId() {
         return id;
     }
@@ -80,6 +98,22 @@ public class ResilienceProcessor extends AsyncProcessorSupport implements Naviga
         this.id = id;
     }
 
+    public boolean isShutdownExecutorService() {
+        return shutdownExecutorService;
+    }
+
+    public void setShutdownExecutorService(boolean shutdownExecutorService) {
+        this.shutdownExecutorService = shutdownExecutorService;
+    }
+
+    public ExecutorService getExecutorService() {
+        return executorService;
+    }
+
+    public void setExecutorService(ExecutorService executorService) {
+        this.executorService = executorService;
+    }
+
     @Override
     public String getTraceLabel() {
         return "resilience4j";
@@ -314,18 +348,17 @@ public class ResilienceProcessor extends AsyncProcessorSupport implements Naviga
             Bulkhead bh = Bulkhead.of(id, bulkheadConfig);
             task = Bulkhead.decorateCallable(bh, task);
         }
-        // timeout handling is more complex with thread-pools
-        // TODO: Allow to plugin custom thread-pool instead of JDKs
+
         if (timeLimiterConfig != null) {
-            final Callable<Exchange> future = task;
-            Supplier<CompletableFuture<Exchange>> futureSupplier = () -> CompletableFuture.supplyAsync(() -> {
-                try {
-                    return future.call();
-                } catch (Exception e) {
-                    exchange.setException(e);
-                }
-                return exchange;
-            });
+            // timeout handling is more complex with thread-pools
+            final CircuitBreakerTimeoutTask timeoutTask = new CircuitBreakerTimeoutTask(task, exchange);
+            Supplier<CompletableFuture<Exchange>> futureSupplier;
+            if (executorService == null) {
+                futureSupplier = () -> CompletableFuture.supplyAsync(timeoutTask::get);
+            } else {
+                futureSupplier = () -> CompletableFuture.supplyAsync(timeoutTask::get, executorService);
+            }
+
             TimeLimiter tl = TimeLimiter.of(id, timeLimiterConfig);
             task = TimeLimiter.decorateFutureSupplier(tl, futureSupplier);
         }
@@ -339,12 +372,15 @@ public class ResilienceProcessor extends AsyncProcessorSupport implements Naviga
 
     @Override
     protected void doStart() throws Exception {
+        ObjectHelper.notNull(camelContext, "CamelContext", this);
         circuitBreaker = CircuitBreaker.of(id, circuitBreakerConfig);
     }
 
     @Override
     protected void doStop() throws Exception {
-        // noop
+        if (shutdownExecutorService && executorService != null) {
+            getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
+        }
     }
 
     private static class CircuitBreakerTask implements Callable<Exchange> {
@@ -441,4 +477,25 @@ public class ResilienceProcessor extends AsyncProcessorSupport implements Naviga
         }
     }
 
+    private static class CircuitBreakerTimeoutTask implements Supplier<Exchange>  {
+
+        private final Callable<Exchange> future;
+        private final Exchange exchange;
+
+        private CircuitBreakerTimeoutTask(Callable<Exchange> future, Exchange exchange) {
+            this.future = future;
+            this.exchange = exchange;
+        }
+
+        @Override
+        public Exchange get() {
+            try {
+                return future.call();
+            } catch (Exception e) {
+                exchange.setException(e);
+            }
+            return exchange;
+        }
+    }
+
 }
diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java
index da157fd..e9aa90c 100644
--- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java
+++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java
@@ -20,6 +20,7 @@ import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.ExecutorService;
 
 import io.github.resilience4j.bulkhead.BulkheadConfig;
 import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
@@ -30,6 +31,7 @@ import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.Processor;
 import org.apache.camel.model.CircuitBreakerDefinition;
 import org.apache.camel.model.Model;
+import org.apache.camel.model.ProcessorDefinitionHelper;
 import org.apache.camel.model.Resilience4jConfigurationCommon;
 import org.apache.camel.model.Resilience4jConfigurationDefinition;
 import org.apache.camel.reifier.ProcessorReifier;
@@ -43,8 +45,6 @@ import static org.apache.camel.support.CamelContextHelper.mandatoryLookup;
 
 public class ResilienceReifier extends ProcessorReifier<CircuitBreakerDefinition> {
 
-    // TODO: thread pool bulkhead
-    // TODO: Configure timeout thread-pool globally
     // TODO: spring-boot allow to configure via resilience4j-spring-boot
     // TODO: example
     // TODO: camel-main - configure hystrix/resilience/rest via java code fluent builder (does it work)
@@ -70,7 +70,9 @@ public class ResilienceReifier extends ProcessorReifier<CircuitBreakerDefinition
         BulkheadConfig bhConfig = configureBulkHead(config);
         TimeLimiterConfig tlConfig = configureTimeLimiter(config);
 
-        return new ResilienceProcessor(cbConfig, bhConfig, tlConfig, processor, fallback);
+        ResilienceProcessor answer = new ResilienceProcessor(cbConfig, bhConfig, tlConfig, processor, fallback);
+        configureTimeoutExecutorService(answer, routeContext, config);
+        return answer;
     }
 
     private CircuitBreakerConfig configureCircuitBreaker(Resilience4jConfigurationCommon config) {
@@ -138,6 +140,24 @@ public class ResilienceReifier extends ProcessorReifier<CircuitBreakerDefinition
         return builder.build();
     }
 
+    private void configureTimeoutExecutorService(ResilienceProcessor processor, RouteContext routeContext, Resilience4jConfigurationCommon config) {
+        if (config.getTimeoutEnabled() == null || !config.getTimeoutEnabled()) {
+            return;
+        }
+
+        if (config.getTimeoutExecutorServiceRef() != null) {
+            String ref = config.getTimeoutExecutorServiceRef();
+            boolean shutdownThreadPool = false;
+            ExecutorService executorService = routeContext.lookup(ref, ExecutorService.class);
+            if (executorService == null) {
+                executorService = ProcessorDefinitionHelper.lookupExecutorServiceRef(routeContext, "CircuitBreaker", definition, ref);
+                shutdownThreadPool = true;
+            }
+            processor.setExecutorService(executorService);
+            processor.setShutdownExecutorService(shutdownThreadPool);
+        }
+    }
+
     // *******************************
     // Helpers
     // *******************************
diff --git a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/HystrixCircuitOpenTest.java b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/HystrixCircuitOpenTest.java
deleted file mode 100644
index e9e7bcf..0000000
--- a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/HystrixCircuitOpenTest.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.resilience4j;
-
-import java.io.IOException;
-
-import org.apache.camel.CamelExecutionException;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.RoutesBuilder;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.test.junit4.CamelTestSupport;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.camel.component.resilience4j.CircuitBreakerConstants.RESPONSE_SHORT_CIRCUITED;
-import static org.apache.camel.component.resilience4j.CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION;
-
-@Ignore
-public class HystrixCircuitOpenTest extends CamelTestSupport {
-    public static final Integer REQUEST_VOLUME_THRESHOLD = 4;
-    private static final Logger LOG = LoggerFactory.getLogger(HystrixCircuitOpenTest.class);
-
-    private HystrixExceptionRoute route = new HystrixExceptionRoute();
-
-    @Test
-    public void testCircuitOpen() throws Exception {
-        LOG.info("testCircuitOpen start");
-        // failing requests
-        route.throwException = true;
-        for (int i = 0; i < 2 * REQUEST_VOLUME_THRESHOLD; i++) {
-            try {
-                template.asyncRequestBody("direct:start", "Request Body");
-            } catch (CamelExecutionException e) {
-                LOG.info(e.toString());
-            }
-        }
-        Thread.sleep(1500);
-
-        resetMocks();
-
-        // notice this can be flaky due timing when using thread sleeps in unit tests
-        getMockEndpoint("mock:result").expectedPropertyReceived(RESPONSE_SHORT_CIRCUITED, true);
-
-        route.throwException = false;
-        try {
-            template.requestBody("direct:start", "Request Body");
-            LOG.info("Instead circuit open expected");
-        } catch (CamelExecutionException e) {
-            LOG.info("Circuit open expected ", e);
-        }
-
-        assertMockEndpointsSatisfied();
-
-        // wait for the circuit to try an other request
-        Thread.sleep(500);
-        for (int i = 0; i < 2 * REQUEST_VOLUME_THRESHOLD; i++) {
-            try {
-                template.requestBody("direct:start", "Request Body");
-                LOG.info("Circuit has closed");
-            } catch (CamelExecutionException e) {
-                Thread.sleep(i * 100);
-                LOG.info("Circuit will be closed soon " + e.toString());
-            }
-        }
-
-        resetMocks();
-
-        getMockEndpoint("mock:result").expectedPropertyReceived(RESPONSE_SHORT_CIRCUITED, false);
-        getMockEndpoint("mock:result").expectedPropertyReceived(RESPONSE_SUCCESSFUL_EXECUTION, true);
-
-        template.requestBody("direct:start", "Request Body");
-
-        assertMockEndpointsSatisfied();
-    }
-
-    @Override
-    protected RoutesBuilder createRouteBuilder() throws Exception {
-        return route;
-    }
-
-    class HystrixExceptionRoute extends RouteBuilder {
-        volatile boolean throwException = true;
-
-        @Override
-        public void configure() throws Exception {
-            from("direct:start")
-                .circuitBreaker()
-                    .hystrixConfiguration()
-                        .executionTimeoutInMilliseconds(100)
-                        .circuitBreakerRequestVolumeThreshold(REQUEST_VOLUME_THRESHOLD)
-                        .metricsRollingStatisticalWindowInMilliseconds(1000)
-                        .circuitBreakerSleepWindowInMilliseconds(2000)
-                    .end()
-                    .log("Hystrix processing start: ${threadName}")
-                    .process(new Processor() {
-                        @Override
-                        public void process(Exchange exchange) throws Exception {
-                            if (throwException) {
-                                LOG.info("Will throw exception");
-                                throw new IOException("Route has failed");
-                            } else {
-                                LOG.info("Will NOT throw exception");
-                            }
-                        }
-                    })
-                    .log("Hystrix processing end: ${threadName}")
-                .end()
-                .log(RESPONSE_SHORT_CIRCUITED + " = ${exchangeProperty." + RESPONSE_SHORT_CIRCUITED + "}")
-                .to("mock:result");
-        }
-    }
-}
-
diff --git a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceTimeoutThreadPoolTest.java b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceTimeoutThreadPoolTest.java
new file mode 100644
index 0000000..58e0c33
--- /dev/null
+++ b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceTimeoutThreadPoolTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.resilience4j;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * Resilience using timeout and custom thread pool with Java DSL
+ */
+public class ResilienceTimeoutThreadPoolTest extends CamelTestSupport {
+
+    @BindToRegistry
+    public ExecutorService myThreadPool() {
+        return context().getExecutorServiceManager().newFixedThreadPool(this, "myThreadPool", 2);
+    }
+
+    @Test
+    public void testFast() throws Exception {
+        // this calls the fast route and therefore we get a response
+        Object out = template.requestBody("direct:start", "fast");
+        assertEquals("Fast response", out);
+
+
+        ThreadPoolExecutor pte = context().getRegistry().lookupByNameAndType("myThreadPool", ThreadPoolExecutor.class);
+        assertNotNull(pte);
+        assertEquals(2, pte.getCorePoolSize());
+        assertEquals(1, pte.getCompletedTaskCount());
+
+        assertFalse(pte.isShutdown());
+    }
+
+    @Test
+    public void testSlow() throws Exception {
+        // this calls the slow route and therefore causes a timeout which triggers an exception
+        try {
+            template.requestBody("direct:start", "slow");
+            fail("Should fail due to timeout");
+        } catch (Exception e) {
+            // expected a timeout
+            assertIsInstanceOf(TimeoutException.class, e.getCause());
+        }
+
+        ThreadPoolExecutor pte = context().getRegistry().lookupByNameAndType("myThreadPool", ThreadPoolExecutor.class);
+        assertNotNull(pte);
+        assertEquals(2, pte.getCorePoolSize());
+        assertEquals(0, pte.getCompletedTaskCount());
+        assertEquals(1, pte.getActiveCount());
+
+        // stop camel and thread pool is also stopped
+        context().stop();
+
+        assertTrue(pte.isShutdown());
+    }
+
+    @Test
+    public void testSlowLoop() throws Exception {
+        // this calls the slow route and therefore causes a timeout which triggers an exception
+        for (int i = 0; i < 10; i++) {
+            try {
+                log.info(">>> test run " + i + " <<<");
+                template.requestBody("direct:start", "slow");
+                fail("Should fail due to timeout");
+            } catch (Exception e) {
+                // expected a timeout
+                assertIsInstanceOf(TimeoutException.class, e.getCause());
+            }
+        }
+    }
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .circuitBreaker()
+                        // enable and use 2 second timeout
+                        .resilience4jConfiguration().timeoutEnabled(true).timeoutDuration(2000).timeoutExecutorServiceRef("myThreadPool").end()
+                        .log("Resilience processing start: ${threadName}")
+                        .toD("direct:${body}")
+                        .log("Resilience processing end: ${threadName}")
+                    .end()
+                    .log("After Resilience ${body}");
+
+                from("direct:fast")
+                    // this is a fast route and takes 1 second to respond
+                    .log("Fast processing start: ${threadName}")
+                    .delay(1000)
+                    .transform().constant("Fast response")
+                    .log("Fast processing end: ${threadName}");
+
+                from("direct:slow")
+                    // this is a slow route and takes 3 second to respond
+                    .log("Slow processing start: ${threadName}")
+                    .delay(3000)
+                    .transform().constant("Slow response")
+                    .log("Slow processing end: ${threadName}");
+            }
+        };
+    }
+
+}
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/model/Resilience4jConfigurationCommon.java b/core/camel-core-engine/src/main/java/org/apache/camel/model/Resilience4jConfigurationCommon.java
index 208f8b0..decdd34 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/model/Resilience4jConfigurationCommon.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/model/Resilience4jConfigurationCommon.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.model;
 
+import java.util.concurrent.ForkJoinPool;
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
@@ -66,6 +67,8 @@ public class Resilience4jConfigurationCommon extends IdentifiedType {
     private Integer bulkheadMaxWaitDuration;
     @Metadata(label = "timeout", defaultValue = "false")
     private Boolean timeoutEnabled;
+    @Metadata(label = "timeout")
+    private String timeoutExecutorServiceRef;
     @Metadata(label = "timeout", defaultValue = "1000")
     private Integer timeoutDuration;
     @Metadata(label = "timeout", defaultValue = "true")
@@ -279,6 +282,17 @@ public class Resilience4jConfigurationCommon extends IdentifiedType {
         this.timeoutEnabled = timeoutEnabled;
     }
 
+    public String getTimeoutExecutorServiceRef() {
+        return timeoutExecutorServiceRef;
+    }
+
+    /**
+     * References to a custom thread pool to use when timeout is enabled (uses {@link ForkJoinPool#commonPool()} by default)
+     */
+    public void setTimeoutExecutorServiceRef(String timeoutExecutorServiceRef) {
+        this.timeoutExecutorServiceRef = timeoutExecutorServiceRef;
+    }
+
     public Integer getTimeoutDuration() {
         return timeoutDuration;
     }
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/model/Resilience4jConfigurationDefinition.java b/core/camel-core-engine/src/main/java/org/apache/camel/model/Resilience4jConfigurationDefinition.java
index 76eadb0..54d4f0d 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/model/Resilience4jConfigurationDefinition.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/model/Resilience4jConfigurationDefinition.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.model;
 
+import java.util.concurrent.ForkJoinPool;
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlRootElement;
@@ -205,6 +206,14 @@ public class Resilience4jConfigurationDefinition extends Resilience4jConfigurati
     }
 
     /**
+     * References to a custom thread pool to use when timeout is enabled (uses {@link ForkJoinPool#commonPool()} by default)
+     */
+    public Resilience4jConfigurationDefinition timeoutExecutorServiceRef(String executorServiceRef) {
+        setTimeoutExecutorServiceRef(executorServiceRef);
+        return this;
+    }
+
+    /**
      * Configures the thread execution timeout (millis).
      * Default value is 1000 millis (1 second).
      */
diff --git a/core/camel-main/src/main/java/org/apache/camel/main/Resilience4jConfigurationProperties.java b/core/camel-main/src/main/java/org/apache/camel/main/Resilience4jConfigurationProperties.java
index 81c1ca9..9292287 100644
--- a/core/camel-main/src/main/java/org/apache/camel/main/Resilience4jConfigurationProperties.java
+++ b/core/camel-main/src/main/java/org/apache/camel/main/Resilience4jConfigurationProperties.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.main;
 
+import java.util.concurrent.ForkJoinPool;
+
 /**
  * Global configuration for Resilience EIP circuit breaker.
  */
@@ -38,6 +40,7 @@ public class Resilience4jConfigurationProperties {
     private Integer bulkheadMaxConcurrentCalls;
     private Integer bulkheadMaxWaitDuration;
     private Boolean timeoutEnabled;
+    private String timeoutExecutorServiceRef;
     private Integer timeoutDuration;
     private Boolean timeoutCancelRunningFuture;
 
@@ -256,6 +259,17 @@ public class Resilience4jConfigurationProperties {
         this.timeoutEnabled = timeoutEnabled;
     }
 
+    public String getTimeoutExecutorServiceRef() {
+        return timeoutExecutorServiceRef;
+    }
+
+    /**
+     * References to a custom thread pool to use when timeout is enabled (uses {@link ForkJoinPool#commonPool()} by default)
+     */
+    public void setTimeoutExecutorServiceRef(String timeoutExecutorServiceRef) {
+        this.timeoutExecutorServiceRef = timeoutExecutorServiceRef;
+    }
+
     public Integer getTimeoutDuration() {
         return timeoutDuration;
     }
@@ -437,6 +451,14 @@ public class Resilience4jConfigurationProperties {
     }
 
     /**
+     * References to a custom thread pool to use when timeout is enabled (uses {@link ForkJoinPool#commonPool()} by default)
+     */
+    public Resilience4jConfigurationProperties withTimeoutExecutorServiceRef(String timeoutExecutorServiceRef) {
+        this.timeoutExecutorServiceRef = timeoutExecutorServiceRef;
+        return this;
+    }
+
+    /**
      * Configures the thread execution timeout (millis).
      * Default value is 1000 millis (1 second).
      */
diff --git a/core/camel-main/src/main/resources/META-INF/camel-main-configuration-metadata.json b/core/camel-main/src/main/resources/META-INF/camel-main-configuration-metadata.json
index 6aac40d..983efdb 100644
--- a/core/camel-main/src/main/resources/META-INF/camel-main-configuration-metadata.json
+++ b/core/camel-main/src/main/resources/META-INF/camel-main-configuration-metadata.json
@@ -703,7 +703,7 @@
 			"name":"camel.resilience4j.timeout-duration",
 			"type":"java.lang.Integer",
 			"sourceType":"org.apache.camel.main.Resilience4jConfigurationProperties",
-			"description":"Configures the thread execution timeout. Default value is 1 second."
+			"description":"Configures the thread execution timeout (millis). Default value is 1000 millis (1 second)."
 		},
 		{
 			"name":"camel.resilience4j.timeout-enabled",
@@ -712,6 +712,12 @@
 			"description":"Whether timeout is enabled or not on the circuit breaker. Default is false."
 		},
 		{
+			"name":"camel.resilience4j.timeout-executor-service-ref",
+			"type":"java.lang.String",
+			"sourceType":"org.apache.camel.main.Resilience4jConfigurationProperties",
+			"description":"References to a custom thread pool to use when timeout is enabled (uses ForkJoinPool#commonPool() by default)"
+		},
+		{
 			"name":"camel.resilience4j.wait-duration-in-open-state",
 			"type":"java.lang.Integer",
 			"sourceType":"org.apache.camel.main.Resilience4jConfigurationProperties",