You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2023/12/05 18:00:10 UTC

(camel) 01/01: CAMEL-20187: Add basic support of virtual threads

This is an automated email from the ASF dual-hosted git repository.

nfilotto pushed a commit to branch virtual-threads-basic-support
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 34f3abb8fbea9df27f69a9665c6bb6d41588626b
Author: Nicolas Filotto <nf...@talend.com>
AuthorDate: Fri Dec 1 18:13:12 2023 +0100

    CAMEL-20187: Add basic support of virtual threads
---
 Jenkinsfile.jdk21                                  |   3 +-
 .../impl/DefaultExecutorServiceManagerTest.java    |   3 +
 .../camel/processor/WireTapAbortPolicyTest.java    |   3 +
 .../AggregateTimeoutWithNoExecutorServiceTest.java |   3 +
 .../DualManagedThreadPoolProfileTest.java          |   3 +
 .../DualManagedThreadPoolWithIdTest.java           |   3 +
 .../management/ManagedRouteRemoveWireTapTest.java  |   3 +
 .../management/ManagedThreadPoolProfileTest.java   |   3 +
 .../camel/management/ManagedThreadPoolTest.java    |   3 +
 .../management/ManagedThreadPoolWithIdTest.java    |   3 +
 core/camel-support/pom.xml                         |  49 +++++
 .../camel/support/DefaultThreadPoolFactory.java    | 208 +++++++++++++++++++++
 core/camel-util/pom.xml                            |  47 +++++
 .../util/concurrent/ThreadFactoryTypeAware.java    |  32 ++++
 .../apache/camel/util/concurrent/ThreadType.java   |  30 +++
 .../camel/util/concurrent/CamelThreadFactory.java  |  91 +++++++++
 .../apache/camel/util/concurrent/ThreadType.java   |  38 ++++
 .../modules/ROOT/pages/threading-model.adoc        |  12 ++
 18 files changed, 536 insertions(+), 1 deletion(-)

diff --git a/Jenkinsfile.jdk21 b/Jenkinsfile.jdk21
index 5c25cfd3973..b4a902dcf0c 100644
--- a/Jenkinsfile.jdk21
+++ b/Jenkinsfile.jdk21
@@ -43,6 +43,7 @@ pipeline {
 
     parameters {
         booleanParam(name: 'CLEAN', defaultValue: true, description: 'Perform the build in clean workspace')
+        booleanParam(name: 'VIRTUAL_THREAD', defaultValue: false, description: 'Perform the build using virtual threads')
     }
 
     stages {
@@ -73,7 +74,7 @@ pipeline {
         stage('Test') {
             steps {
                 timeout(unit: 'HOURS', time: 7) {
-                    sh "./mvnw $MAVEN_PARAMS $MAVEN_TEST_PARAMS -Darchetype.test.skip -Dmaven.test.failure.ignore=true -Dcheckstyle.skip=true verify"
+                    sh "./mvnw $MAVEN_PARAMS $MAVEN_TEST_PARAMS -Darchetype.test.skip -Dmaven.test.failure.ignore=true -Dcheckstyle.skip=true verify -Dcamel.threads.virtual.enabled=${params.VIRTUAL_THREAD}"
                 }
             }
             post {
diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java
index 3913414d050..8d3f98ef00b 100644
--- a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java
@@ -27,6 +27,7 @@ import org.apache.camel.util.concurrent.SizedScheduledExecutorService;
 import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -37,6 +38,8 @@ import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+@DisabledIfSystemProperty(named = "camel.threads.virtual.enabled", matches = "true",
+                          disabledReason = "In case of Virtual Threads, ThreadPerTaskExecutor is created instead of ThreadPoolExecutor")
 public class DefaultExecutorServiceManagerTest extends ContextTestSupport {
 
     @Test
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java
index 57260b4be7d..81b279f21dd 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java
@@ -30,6 +30,7 @@ import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
 import org.junit.jupiter.api.parallel.Isolated;
 
 import static org.junit.jupiter.api.Assertions.fail;
@@ -37,6 +38,8 @@ import static org.junit.jupiter.api.Assertions.fail;
 /**
  * Wire tap unit test
  */
+@DisabledIfSystemProperty(named = "camel.threads.virtual.enabled", matches = "true",
+                          disabledReason = "Tasks are not rejected when using Virtual Threads")
 @Isolated
 public class WireTapAbortPolicyTest extends ContextTestSupport {
 
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithNoExecutorServiceTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithNoExecutorServiceTest.java
index cc838dbeb30..624bf630d7f 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithNoExecutorServiceTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithNoExecutorServiceTest.java
@@ -21,9 +21,12 @@ import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
 
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+@DisabledIfSystemProperty(named = "camel.threads.virtual.enabled", matches = "true",
+                          disabledReason = "In case of Virtual Threads, the threads cannot be counted this way")
 public class AggregateTimeoutWithNoExecutorServiceTest extends ContextTestSupport {
 
     @Test
diff --git a/core/camel-management/src/test/java/org/apache/camel/management/DualManagedThreadPoolProfileTest.java b/core/camel-management/src/test/java/org/apache/camel/management/DualManagedThreadPoolProfileTest.java
index a17468632cf..79cbfe181c6 100644
--- a/core/camel-management/src/test/java/org/apache/camel/management/DualManagedThreadPoolProfileTest.java
+++ b/core/camel-management/src/test/java/org/apache/camel/management/DualManagedThreadPoolProfileTest.java
@@ -23,12 +23,15 @@ import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.spi.ThreadPoolProfile;
 import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
 import org.junit.jupiter.api.condition.DisabledOnOs;
 import org.junit.jupiter.api.condition.OS;
 
 import static org.apache.camel.management.DefaultManagementObjectNameStrategy.TYPE_THREAD_POOL;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
+@DisabledIfSystemProperty(named = "camel.threads.virtual.enabled", matches = "true",
+                          disabledReason = "In case of Virtual Threads, the created thread pools don't have all these attributes")
 @DisabledOnOs(OS.AIX)
 public class DualManagedThreadPoolProfileTest extends ManagementTestSupport {
 
diff --git a/core/camel-management/src/test/java/org/apache/camel/management/DualManagedThreadPoolWithIdTest.java b/core/camel-management/src/test/java/org/apache/camel/management/DualManagedThreadPoolWithIdTest.java
index 6e1364609d7..71748076d8c 100644
--- a/core/camel-management/src/test/java/org/apache/camel/management/DualManagedThreadPoolWithIdTest.java
+++ b/core/camel-management/src/test/java/org/apache/camel/management/DualManagedThreadPoolWithIdTest.java
@@ -21,12 +21,15 @@ import javax.management.ObjectName;
 
 import org.apache.camel.builder.RouteBuilder;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
 import org.junit.jupiter.api.condition.DisabledOnOs;
 import org.junit.jupiter.api.condition.OS;
 
 import static org.apache.camel.management.DefaultManagementObjectNameStrategy.TYPE_THREAD_POOL;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
+@DisabledIfSystemProperty(named = "camel.threads.virtual.enabled", matches = "true",
+                          disabledReason = "In case of Virtual Threads, the created thread pools don't have all these attributes")
 @DisabledOnOs(OS.AIX)
 public class DualManagedThreadPoolWithIdTest extends ManagementTestSupport {
 
diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapTest.java
index 4bf24a0fe36..499a63d68cf 100644
--- a/core/camel-management/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapTest.java
+++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapTest.java
@@ -24,6 +24,7 @@ import javax.management.ObjectName;
 import org.apache.camel.ServiceStatus;
 import org.apache.camel.builder.RouteBuilder;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
 import org.junit.jupiter.api.condition.DisabledOnOs;
 import org.junit.jupiter.api.condition.OS;
 
@@ -32,6 +33,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+@DisabledIfSystemProperty(named = "camel.threads.virtual.enabled", matches = "true",
+                          disabledReason = "In case of Virtual Threads, the thread pools are not ThreadPoolExecutor")
 @DisabledOnOs(OS.AIX)
 public class ManagedRouteRemoveWireTapTest extends ManagementTestSupport {
 
diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolProfileTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolProfileTest.java
index e58414bf5bc..f3b9efee03d 100644
--- a/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolProfileTest.java
+++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolProfileTest.java
@@ -23,6 +23,7 @@ import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.spi.ThreadPoolProfile;
 import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
 import org.junit.jupiter.api.condition.DisabledOnOs;
 import org.junit.jupiter.api.condition.OS;
 
@@ -30,6 +31,8 @@ import static org.apache.camel.management.DefaultManagementObjectNameStrategy.TY
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+@DisabledIfSystemProperty(named = "camel.threads.virtual.enabled", matches = "true",
+                          disabledReason = "In case of Virtual Threads, the created thread pools don't have all these attributes")
 @DisabledOnOs(OS.AIX)
 public class ManagedThreadPoolProfileTest extends ManagementTestSupport {
 
diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolTest.java
index 83cfc322b08..ce8be83dbb6 100644
--- a/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolTest.java
+++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolTest.java
@@ -28,12 +28,15 @@ import javax.management.ReflectionException;
 import org.apache.camel.builder.RouteBuilder;
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
 import org.junit.jupiter.api.condition.DisabledOnOs;
 import org.junit.jupiter.api.condition.OS;
 
 import static org.apache.camel.management.DefaultManagementObjectNameStrategy.TYPE_THREAD_POOL;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
+@DisabledIfSystemProperty(named = "camel.threads.virtual.enabled", matches = "true",
+                          disabledReason = "In case of Virtual Threads, the created thread pools don't have all these attributes")
 @DisabledOnOs(OS.AIX)
 public class ManagedThreadPoolTest extends ManagementTestSupport {
 
diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolWithIdTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolWithIdTest.java
index de542a796ca..9116e20f8ef 100644
--- a/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolWithIdTest.java
+++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolWithIdTest.java
@@ -21,12 +21,15 @@ import javax.management.ObjectName;
 
 import org.apache.camel.builder.RouteBuilder;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
 import org.junit.jupiter.api.condition.DisabledOnOs;
 import org.junit.jupiter.api.condition.OS;
 
 import static org.apache.camel.management.DefaultManagementObjectNameStrategy.TYPE_THREAD_POOL;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
+@DisabledIfSystemProperty(named = "camel.threads.virtual.enabled", matches = "true",
+                          disabledReason = "In case of Virtual Threads, the created thread pools don't have all these attributes")
 @DisabledOnOs(OS.AIX)
 public class ManagedThreadPoolWithIdTest extends ManagementTestSupport {
 
diff --git a/core/camel-support/pom.xml b/core/camel-support/pom.xml
index 32c6688c907..2ede683e498 100644
--- a/core/camel-support/pom.xml
+++ b/core/camel-support/pom.xml
@@ -85,4 +85,53 @@
         </dependency>
     </dependencies>
 
+    <profiles>
+        <profile>
+            <id>java-21-sources</id>
+            <activation>
+                <jdk>[21,)</jdk>
+            </activation>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-compiler-plugin</artifactId>
+                        <version>${maven-compiler-plugin-version}</version>
+                        <executions>
+                            <execution>
+                                <id>default-compile</id>
+                                <phase>compile</phase>
+                                <goals>
+                                    <goal>compile</goal>
+                                </goals>
+                            </execution>
+                            <execution>
+                                <id>compile-java-21</id>
+                                <phase>compile</phase>
+                                <goals>
+                                    <goal>compile</goal>
+                                </goals>
+                                <configuration>
+                                    <release>21</release>
+                                    <compileSourceRoots>${project.basedir}/src/main/java21</compileSourceRoots>
+                                    <multiReleaseOutput>true</multiReleaseOutput>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-jar-plugin</artifactId>
+                        <configuration>
+                            <archive>
+                                <manifestEntries>
+                                    <Multi-Release>true</Multi-Release>
+                                </manifestEntries>
+                            </archive>
+                        </configuration>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
 </project>
diff --git a/core/camel-support/src/main/java21/org/apache/camel/support/DefaultThreadPoolFactory.java b/core/camel-support/src/main/java21/org/apache/camel/support/DefaultThreadPoolFactory.java
new file mode 100644
index 00000000000..8fee4c9efd8
--- /dev/null
+++ b/core/camel-support/src/main/java21/org/apache/camel/support/DefaultThreadPoolFactory.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.support;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.StaticService;
+import org.apache.camel.spi.ThreadPoolFactory;
+import org.apache.camel.spi.ThreadPoolProfile;
+import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.util.concurrent.RejectableScheduledThreadPoolExecutor;
+import org.apache.camel.util.concurrent.RejectableThreadPoolExecutor;
+import org.apache.camel.util.concurrent.SizedScheduledExecutorService;
+import org.apache.camel.util.concurrent.ThreadType;
+import org.apache.camel.util.concurrent.ThreadFactoryTypeAware;
+
+/**
+ * Factory for thread pools that uses the JDK {@link Executors} for creating the thread pools.
+ */
+public class DefaultThreadPoolFactory extends ServiceSupport implements CamelContextAware, ThreadPoolFactory, StaticService {
+
+    private CamelContext camelContext;
+
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    @Override
+    public ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
+        return ThreadPoolFactoryType.from(threadFactory, Integer.MAX_VALUE).newCachedThreadPool(threadFactory);
+    }
+
+    @Override
+    public ExecutorService newThreadPool(ThreadPoolProfile profile, ThreadFactory factory) {
+        // allow core thread timeout is default true if not configured
+        boolean allow = profile.getAllowCoreThreadTimeOut() != null ? profile.getAllowCoreThreadTimeOut() : true;
+        return newThreadPool(profile.getPoolSize(),
+                profile.getMaxPoolSize(),
+                profile.getKeepAliveTime(),
+                profile.getTimeUnit(),
+                profile.getMaxQueueSize(),
+                allow,
+                profile.getRejectedExecutionHandler(),
+                factory);
+    }
+
+    public ExecutorService newThreadPool(
+            int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, int maxQueueSize,
+            boolean allowCoreThreadTimeOut,
+            RejectedExecutionHandler rejectedExecutionHandler, ThreadFactory threadFactory)
+            throws IllegalArgumentException {
+        // the core pool size must be 0 or higher
+        if (corePoolSize < 0) {
+            throw new IllegalArgumentException("CorePoolSize must be >= 0, was " + corePoolSize);
+        }
+
+        // validate max >= core
+        if (maxPoolSize < corePoolSize) {
+            throw new IllegalArgumentException(
+                    "MaxPoolSize must be >= corePoolSize, was " + maxPoolSize + " >= " + corePoolSize);
+        }
+        return ThreadPoolFactoryType.from(threadFactory, corePoolSize, maxPoolSize, maxQueueSize).newThreadPool(
+                corePoolSize, maxPoolSize, keepAliveTime, timeUnit, maxQueueSize, allowCoreThreadTimeOut,
+                rejectedExecutionHandler, threadFactory);
+    }
+
+    @Override
+    public ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) {
+        return ThreadPoolFactoryType.from(threadFactory, profile).newScheduledThreadPool(profile, threadFactory);
+    }
+
+    private enum ThreadPoolFactoryType {
+        PLATFORM {
+            ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
+                return Executors.newCachedThreadPool(threadFactory);
+            }
+
+            ExecutorService newThreadPool(
+                    int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, int maxQueueSize,
+                    boolean allowCoreThreadTimeOut,
+                    RejectedExecutionHandler rejectedExecutionHandler, ThreadFactory threadFactory)
+                    throws IllegalArgumentException {
+
+                BlockingQueue<Runnable> workQueue;
+                if (corePoolSize == 0 && maxQueueSize <= 0) {
+                    // use a synchronous queue for direct-handover (no tasks stored on the queue)
+                    workQueue = new SynchronousQueue<>();
+                    // and force 1 as pool size to be able to create the thread pool by the JDK
+                    corePoolSize = 1;
+                    maxPoolSize = 1;
+                } else if (maxQueueSize <= 0) {
+                    // use a synchronous queue for direct-handover (no tasks stored on the queue)
+                    workQueue = new SynchronousQueue<>();
+                } else {
+                    // bounded task queue to store tasks on the queue
+                    workQueue = new LinkedBlockingQueue<>(maxQueueSize);
+                }
+
+                ThreadPoolExecutor answer
+                        = new RejectableThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue);
+                answer.setThreadFactory(threadFactory);
+                answer.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
+                if (rejectedExecutionHandler == null) {
+                    rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
+                }
+                answer.setRejectedExecutionHandler(rejectedExecutionHandler);
+                return answer;
+            }
+
+            ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) {
+                RejectedExecutionHandler rejectedExecutionHandler = profile.getRejectedExecutionHandler();
+                if (rejectedExecutionHandler == null) {
+                    rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
+                }
+
+                ScheduledThreadPoolExecutor answer
+                        = new RejectableScheduledThreadPoolExecutor(profile.getPoolSize(), threadFactory, rejectedExecutionHandler);
+                answer.setRemoveOnCancelPolicy(true);
+
+                // need to wrap the thread pool in a sized to guard against the problem that the
+                // JDK created thread pool has an unbounded queue (see class javadoc), which mean
+                // we could potentially keep adding tasks, and run out of memory.
+                if (profile.getMaxPoolSize() > 0) {
+                    return new SizedScheduledExecutorService(answer, profile.getMaxQueueSize());
+                } else {
+                    return answer;
+                }
+            }
+        },
+        VIRTUAL {
+            @Override
+            ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
+                return Executors.newThreadPerTaskExecutor(threadFactory);
+            }
+
+            @Override
+            ExecutorService newThreadPool(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit,
+                                          int maxQueueSize, boolean allowCoreThreadTimeOut,
+                                          RejectedExecutionHandler rejectedExecutionHandler,
+                                          ThreadFactory threadFactory) throws IllegalArgumentException {
+                return Executors.newThreadPerTaskExecutor(threadFactory);
+            }
+
+            @Override
+            ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) {
+                return Executors.newScheduledThreadPool(0, threadFactory);
+            }
+        };
+
+        static ThreadPoolFactoryType from(ThreadFactory threadFactory, ThreadPoolProfile profile) {
+            return from(threadFactory, profile.getPoolSize(), profile.getMaxPoolSize(), profile.getMaxQueueSize());
+        }
+
+        static ThreadPoolFactoryType from(ThreadFactory threadFactory, int corePoolSize, int maxPoolSize, int maxQueueSize) {
+            return from(threadFactory, corePoolSize == 0 && maxQueueSize <= 0 ? 1 : maxPoolSize);
+        }
+
+        static ThreadPoolFactoryType from(ThreadFactory threadFactory, int maxPoolSize) {
+            if (ThreadType.current() == ThreadType.PLATFORM) {
+                return ThreadPoolFactoryType.PLATFORM;
+            }
+            return maxPoolSize > 1 && threadFactory instanceof ThreadFactoryTypeAware factoryTypeAware && factoryTypeAware.isVirtual() ?
+                    ThreadPoolFactoryType.VIRTUAL : ThreadPoolFactoryType.PLATFORM;
+        }
+
+        abstract ExecutorService newCachedThreadPool(ThreadFactory threadFactory);
+
+        abstract ExecutorService newThreadPool(
+                int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, int maxQueueSize,
+                boolean allowCoreThreadTimeOut,
+                RejectedExecutionHandler rejectedExecutionHandler, ThreadFactory threadFactory)
+                throws IllegalArgumentException;
+
+        abstract ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory);
+    }
+}
diff --git a/core/camel-util/pom.xml b/core/camel-util/pom.xml
index 2933fd9d9b8..c48ad466571 100644
--- a/core/camel-util/pom.xml
+++ b/core/camel-util/pom.xml
@@ -272,5 +272,52 @@
                 </plugins>
             </build>
         </profile>
+        <profile>
+            <id>java-21-sources</id>
+            <activation>
+                <jdk>[21,)</jdk>
+            </activation>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-compiler-plugin</artifactId>
+                        <version>${maven-compiler-plugin-version}</version>
+                        <executions>
+                            <execution>
+                                <id>default-compile</id>
+                                <phase>compile</phase>
+                                <goals>
+                                    <goal>compile</goal>
+                                </goals>
+                            </execution>
+                            <execution>
+                                <id>compile-java-21</id>
+                                <phase>compile</phase>
+                                <goals>
+                                    <goal>compile</goal>
+                                </goals>
+                                <configuration>
+                                    <release>21</release>
+                                    <compileSourceRoots>${project.basedir}/src/main/java21</compileSourceRoots>
+                                    <multiReleaseOutput>true</multiReleaseOutput>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-jar-plugin</artifactId>
+                        <configuration>
+                            <archive>
+                                <manifestEntries>
+                                    <Multi-Release>true</Multi-Release>
+                                </manifestEntries>
+                            </archive>
+                        </configuration>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
     </profiles>
 </project>
diff --git a/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadFactoryTypeAware.java b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadFactoryTypeAware.java
new file mode 100644
index 00000000000..39a7a398768
--- /dev/null
+++ b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadFactoryTypeAware.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * The interface indicating whether the created threads are virtual or not.
+ */
+public interface ThreadFactoryTypeAware extends ThreadFactory {
+
+    /**
+     * Indicates whether the created threads are virtual.
+     *
+     * @return {@code true} if the created threads are virtual, {@code false} if they are platform threads.
+     */
+    boolean isVirtual();
+}
diff --git a/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadType.java b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadType.java
new file mode 100644
index 00000000000..b048a2640c9
--- /dev/null
+++ b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadType.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+/**
+ * Defines the existing type of threads. The virtual threads can only be used with JDK 21+ and the system property
+ * {@code camel.threads.virtual.enabled} set to {@code true}.
+ */
+public enum ThreadType {
+    PLATFORM,
+    VIRTUAL;
+
+    public static ThreadType current() {
+        return PLATFORM;
+    }
+}
diff --git a/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/CamelThreadFactory.java b/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/CamelThreadFactory.java
new file mode 100644
index 00000000000..d6c54ddc692
--- /dev/null
+++ b/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/CamelThreadFactory.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Thread factory which creates threads supporting a naming pattern.
+ * The factory creates virtual threads in case the System property {@code camel.threads.virtual.enabled} set to
+ * {@code true}.
+ */
+public final class CamelThreadFactory implements ThreadFactoryTypeAware {
+    private static final Logger LOG = LoggerFactory.getLogger(CamelThreadFactory.class);
+
+    private static final ThreadFactoryType TYPE = ThreadFactoryType.current();
+
+    private final String pattern;
+    private final String name;
+    private final boolean daemon;
+    private final ThreadFactoryType threadType;
+
+    public CamelThreadFactory(String pattern, String name, boolean daemon) {
+        this.pattern = pattern;
+        this.name = name;
+        this.daemon = daemon;
+        this.threadType = daemon ? TYPE : ThreadFactoryType.PLATFORM;
+    }
+
+    @Override
+    public boolean isVirtual() {
+        return threadType == ThreadFactoryType.VIRTUAL;
+    }
+
+    @Override
+    public Thread newThread(Runnable runnable) {
+        String threadName = ThreadHelper.resolveThreadName(pattern, name);
+
+        Thread answer = threadType.newThread(threadName, daemon, runnable);
+
+        LOG.trace("Created thread[{}] -> {}", threadName, answer);
+        return answer;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    @Override
+    public String toString() {
+        return "CamelThreadFactory[" + name + "]";
+    }
+
+    private enum ThreadFactoryType {
+        PLATFORM {
+            Thread.Builder newThreadBuilder(String threadName, boolean daemon) {
+                return Thread.ofPlatform().name(threadName).daemon(daemon);
+            }
+        },
+        VIRTUAL {
+            Thread.Builder newThreadBuilder(String threadName, boolean daemon) {
+                return Thread.ofVirtual().name(threadName);
+            }
+        };
+
+        Thread newThread(String threadName, boolean daemon, Runnable runnable) {
+            return newThreadBuilder(threadName, daemon).unstarted(runnable);
+        }
+
+        abstract Thread.Builder newThreadBuilder(String threadName, boolean daemon);
+
+        static ThreadFactoryType current() {
+            return ThreadType.current() == ThreadType.VIRTUAL ? VIRTUAL : PLATFORM;
+        }
+    }
+}
+
diff --git a/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/ThreadType.java b/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/ThreadType.java
new file mode 100644
index 00000000000..0bc527ecf8e
--- /dev/null
+++ b/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/ThreadType.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Defines the existing type of threads. The virtual threads can only be used with JDK 21+ and the system property
+ * {@code camel.threads.virtual.enabled} set to {@code true}.
+ * The default value is {@code false} which means that platform threads are used by default.
+ */
+public enum ThreadType {
+    PLATFORM,
+    VIRTUAL;
+    private static final Logger LOG = LoggerFactory.getLogger(ThreadType.class);
+    private static final ThreadType CURRENT = Boolean.getBoolean("camel.threads.virtual.enabled") ? VIRTUAL : PLATFORM;
+    static {
+        LOG.info("The type of thread detected is {}", CURRENT);
+    }
+    public static ThreadType current() {
+        return CURRENT;
+    }
+}
diff --git a/docs/user-manual/modules/ROOT/pages/threading-model.adoc b/docs/user-manual/modules/ROOT/pages/threading-model.adoc
index b71194857be..a8782e808f3 100644
--- a/docs/user-manual/modules/ROOT/pages/threading-model.adoc
+++ b/docs/user-manual/modules/ROOT/pages/threading-model.adoc
@@ -241,3 +241,15 @@ you should implement and hook into the WorkManager.
 To hook in custom thread pool providers (e.g. for J2EE servers) a
 `ThreadPoolFactory` interface can be implemented. The implementation can
 be set in the `ExecutorServiceManager`.
+
+== Virtual Threads
+
+Starting from Java 21, the default `ThreadPoolFactory` can build `ExecutorService` and `ScheduledExecutorService` that
+use https://openjdk.org/jeps/425[virtual threads] instead of platform threads.
+But as it is an experimental feature, it is not enabled by default, you need to set the System property `camel.threads.virtual.enabled`
+to `true` and run Camel using Java 21 or above to enable it.
+
+Be aware that even if it is enabled, there are some use cases where platform threads are still used, for example, if the
+thread factory is configured to create non-daemon threads since virtual threads can only be daemons, or when the
+`ExecutorService` or `ScheduledExecutorService` to build cannot have more than one thread or finally when `corePoolSize`
+is set to zero and `maxQueueSize` is set to a value less or equal to `0`.