You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2023/12/05 18:00:09 UTC

(camel) branch virtual-threads-basic-support updated (3699aacf146 -> 34f3abb8fbe)

This is an automated email from the ASF dual-hosted git repository.

nfilotto pushed a change to branch virtual-threads-basic-support
in repository https://gitbox.apache.org/repos/asf/camel.git


    omit 3699aacf146 CAMEL-20187: Add basic support of virtual threads
     new 34f3abb8fbe CAMEL-20187: Add basic support of virtual threads

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (3699aacf146)
            \
             N -- N -- N   refs/heads/virtual-threads-basic-support (34f3abb8fbe)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/camel/management/DualManagedThreadPoolProfileTest.java  | 3 +++
 .../org/apache/camel/management/DualManagedThreadPoolWithIdTest.java   | 3 +++
 .../org/apache/camel/management/ManagedRouteRemoveWireTapTest.java     | 3 +++
 .../java/org/apache/camel/management/ManagedThreadPoolProfileTest.java | 3 +++
 .../test/java/org/apache/camel/management/ManagedThreadPoolTest.java   | 3 +++
 .../java/org/apache/camel/management/ManagedThreadPoolWithIdTest.java  | 3 +++
 6 files changed, 18 insertions(+)


(camel) 01/01: CAMEL-20187: Add basic support of virtual threads

Posted by nf...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nfilotto pushed a commit to branch virtual-threads-basic-support
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 34f3abb8fbea9df27f69a9665c6bb6d41588626b
Author: Nicolas Filotto <nf...@talend.com>
AuthorDate: Fri Dec 1 18:13:12 2023 +0100

    CAMEL-20187: Add basic support of virtual threads
---
 Jenkinsfile.jdk21                                  |   3 +-
 .../impl/DefaultExecutorServiceManagerTest.java    |   3 +
 .../camel/processor/WireTapAbortPolicyTest.java    |   3 +
 .../AggregateTimeoutWithNoExecutorServiceTest.java |   3 +
 .../DualManagedThreadPoolProfileTest.java          |   3 +
 .../DualManagedThreadPoolWithIdTest.java           |   3 +
 .../management/ManagedRouteRemoveWireTapTest.java  |   3 +
 .../management/ManagedThreadPoolProfileTest.java   |   3 +
 .../camel/management/ManagedThreadPoolTest.java    |   3 +
 .../management/ManagedThreadPoolWithIdTest.java    |   3 +
 core/camel-support/pom.xml                         |  49 +++++
 .../camel/support/DefaultThreadPoolFactory.java    | 208 +++++++++++++++++++++
 core/camel-util/pom.xml                            |  47 +++++
 .../util/concurrent/ThreadFactoryTypeAware.java    |  32 ++++
 .../apache/camel/util/concurrent/ThreadType.java   |  30 +++
 .../camel/util/concurrent/CamelThreadFactory.java  |  91 +++++++++
 .../apache/camel/util/concurrent/ThreadType.java   |  38 ++++
 .../modules/ROOT/pages/threading-model.adoc        |  12 ++
 18 files changed, 536 insertions(+), 1 deletion(-)

diff --git a/Jenkinsfile.jdk21 b/Jenkinsfile.jdk21
index 5c25cfd3973..b4a902dcf0c 100644
--- a/Jenkinsfile.jdk21
+++ b/Jenkinsfile.jdk21
@@ -43,6 +43,7 @@ pipeline {
 
     parameters {
         booleanParam(name: 'CLEAN', defaultValue: true, description: 'Perform the build in clean workspace')
+        booleanParam(name: 'VIRTUAL_THREAD', defaultValue: false, description: 'Perform the build using virtual threads')
     }
 
     stages {
@@ -73,7 +74,7 @@ pipeline {
         stage('Test') {
             steps {
                 timeout(unit: 'HOURS', time: 7) {
-                    sh "./mvnw $MAVEN_PARAMS $MAVEN_TEST_PARAMS -Darchetype.test.skip -Dmaven.test.failure.ignore=true -Dcheckstyle.skip=true verify"
+                    sh "./mvnw $MAVEN_PARAMS $MAVEN_TEST_PARAMS -Darchetype.test.skip -Dmaven.test.failure.ignore=true -Dcheckstyle.skip=true verify -Dcamel.threads.virtual.enabled=${params.VIRTUAL_THREAD}"
                 }
             }
             post {
diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java
index 3913414d050..8d3f98ef00b 100644
--- a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java
@@ -27,6 +27,7 @@ import org.apache.camel.util.concurrent.SizedScheduledExecutorService;
 import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -37,6 +38,8 @@ import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+@DisabledIfSystemProperty(named = "camel.threads.virtual.enabled", matches = "true",
+                          disabledReason = "In case of Virtual Threads, ThreadPerTaskExecutor is created instead of ThreadPoolExecutor")
 public class DefaultExecutorServiceManagerTest extends ContextTestSupport {
 
     @Test
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java
index 57260b4be7d..81b279f21dd 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java
@@ -30,6 +30,7 @@ import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
 import org.junit.jupiter.api.parallel.Isolated;
 
 import static org.junit.jupiter.api.Assertions.fail;
@@ -37,6 +38,8 @@ import static org.junit.jupiter.api.Assertions.fail;
 /**
  * Wire tap unit test
  */
+@DisabledIfSystemProperty(named = "camel.threads.virtual.enabled", matches = "true",
+                          disabledReason = "Tasks are not rejected when using Virtual Threads")
 @Isolated
 public class WireTapAbortPolicyTest extends ContextTestSupport {
 
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithNoExecutorServiceTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithNoExecutorServiceTest.java
index cc838dbeb30..624bf630d7f 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithNoExecutorServiceTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithNoExecutorServiceTest.java
@@ -21,9 +21,12 @@ import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
 
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+@DisabledIfSystemProperty(named = "camel.threads.virtual.enabled", matches = "true",
+                          disabledReason = "In case of Virtual Threads, the threads cannot be counted this way")
 public class AggregateTimeoutWithNoExecutorServiceTest extends ContextTestSupport {
 
     @Test
diff --git a/core/camel-management/src/test/java/org/apache/camel/management/DualManagedThreadPoolProfileTest.java b/core/camel-management/src/test/java/org/apache/camel/management/DualManagedThreadPoolProfileTest.java
index a17468632cf..79cbfe181c6 100644
--- a/core/camel-management/src/test/java/org/apache/camel/management/DualManagedThreadPoolProfileTest.java
+++ b/core/camel-management/src/test/java/org/apache/camel/management/DualManagedThreadPoolProfileTest.java
@@ -23,12 +23,15 @@ import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.spi.ThreadPoolProfile;
 import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
 import org.junit.jupiter.api.condition.DisabledOnOs;
 import org.junit.jupiter.api.condition.OS;
 
 import static org.apache.camel.management.DefaultManagementObjectNameStrategy.TYPE_THREAD_POOL;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
+@DisabledIfSystemProperty(named = "camel.threads.virtual.enabled", matches = "true",
+                          disabledReason = "In case of Virtual Threads, the created thread pools don't have all these attributes")
 @DisabledOnOs(OS.AIX)
 public class DualManagedThreadPoolProfileTest extends ManagementTestSupport {
 
diff --git a/core/camel-management/src/test/java/org/apache/camel/management/DualManagedThreadPoolWithIdTest.java b/core/camel-management/src/test/java/org/apache/camel/management/DualManagedThreadPoolWithIdTest.java
index 6e1364609d7..71748076d8c 100644
--- a/core/camel-management/src/test/java/org/apache/camel/management/DualManagedThreadPoolWithIdTest.java
+++ b/core/camel-management/src/test/java/org/apache/camel/management/DualManagedThreadPoolWithIdTest.java
@@ -21,12 +21,15 @@ import javax.management.ObjectName;
 
 import org.apache.camel.builder.RouteBuilder;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
 import org.junit.jupiter.api.condition.DisabledOnOs;
 import org.junit.jupiter.api.condition.OS;
 
 import static org.apache.camel.management.DefaultManagementObjectNameStrategy.TYPE_THREAD_POOL;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
+@DisabledIfSystemProperty(named = "camel.threads.virtual.enabled", matches = "true",
+                          disabledReason = "In case of Virtual Threads, the created thread pools don't have all these attributes")
 @DisabledOnOs(OS.AIX)
 public class DualManagedThreadPoolWithIdTest extends ManagementTestSupport {
 
diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapTest.java
index 4bf24a0fe36..499a63d68cf 100644
--- a/core/camel-management/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapTest.java
+++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapTest.java
@@ -24,6 +24,7 @@ import javax.management.ObjectName;
 import org.apache.camel.ServiceStatus;
 import org.apache.camel.builder.RouteBuilder;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
 import org.junit.jupiter.api.condition.DisabledOnOs;
 import org.junit.jupiter.api.condition.OS;
 
@@ -32,6 +33,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+@DisabledIfSystemProperty(named = "camel.threads.virtual.enabled", matches = "true",
+                          disabledReason = "In case of Virtual Threads, the thread pools are not ThreadPoolExecutor")
 @DisabledOnOs(OS.AIX)
 public class ManagedRouteRemoveWireTapTest extends ManagementTestSupport {
 
diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolProfileTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolProfileTest.java
index e58414bf5bc..f3b9efee03d 100644
--- a/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolProfileTest.java
+++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolProfileTest.java
@@ -23,6 +23,7 @@ import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.spi.ThreadPoolProfile;
 import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
 import org.junit.jupiter.api.condition.DisabledOnOs;
 import org.junit.jupiter.api.condition.OS;
 
@@ -30,6 +31,8 @@ import static org.apache.camel.management.DefaultManagementObjectNameStrategy.TY
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+@DisabledIfSystemProperty(named = "camel.threads.virtual.enabled", matches = "true",
+                          disabledReason = "In case of Virtual Threads, the created thread pools don't have all these attributes")
 @DisabledOnOs(OS.AIX)
 public class ManagedThreadPoolProfileTest extends ManagementTestSupport {
 
diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolTest.java
index 83cfc322b08..ce8be83dbb6 100644
--- a/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolTest.java
+++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolTest.java
@@ -28,12 +28,15 @@ import javax.management.ReflectionException;
 import org.apache.camel.builder.RouteBuilder;
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
 import org.junit.jupiter.api.condition.DisabledOnOs;
 import org.junit.jupiter.api.condition.OS;
 
 import static org.apache.camel.management.DefaultManagementObjectNameStrategy.TYPE_THREAD_POOL;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
+@DisabledIfSystemProperty(named = "camel.threads.virtual.enabled", matches = "true",
+                          disabledReason = "In case of Virtual Threads, the created thread pools don't have all these attributes")
 @DisabledOnOs(OS.AIX)
 public class ManagedThreadPoolTest extends ManagementTestSupport {
 
diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolWithIdTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolWithIdTest.java
index de542a796ca..9116e20f8ef 100644
--- a/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolWithIdTest.java
+++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolWithIdTest.java
@@ -21,12 +21,15 @@ import javax.management.ObjectName;
 
 import org.apache.camel.builder.RouteBuilder;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
 import org.junit.jupiter.api.condition.DisabledOnOs;
 import org.junit.jupiter.api.condition.OS;
 
 import static org.apache.camel.management.DefaultManagementObjectNameStrategy.TYPE_THREAD_POOL;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
+@DisabledIfSystemProperty(named = "camel.threads.virtual.enabled", matches = "true",
+                          disabledReason = "In case of Virtual Threads, the created thread pools don't have all these attributes")
 @DisabledOnOs(OS.AIX)
 public class ManagedThreadPoolWithIdTest extends ManagementTestSupport {
 
diff --git a/core/camel-support/pom.xml b/core/camel-support/pom.xml
index 32c6688c907..2ede683e498 100644
--- a/core/camel-support/pom.xml
+++ b/core/camel-support/pom.xml
@@ -85,4 +85,53 @@
         </dependency>
     </dependencies>
 
+    <profiles>
+        <profile>
+            <id>java-21-sources</id>
+            <activation>
+                <jdk>[21,)</jdk>
+            </activation>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-compiler-plugin</artifactId>
+                        <version>${maven-compiler-plugin-version}</version>
+                        <executions>
+                            <execution>
+                                <id>default-compile</id>
+                                <phase>compile</phase>
+                                <goals>
+                                    <goal>compile</goal>
+                                </goals>
+                            </execution>
+                            <execution>
+                                <id>compile-java-21</id>
+                                <phase>compile</phase>
+                                <goals>
+                                    <goal>compile</goal>
+                                </goals>
+                                <configuration>
+                                    <release>21</release>
+                                    <compileSourceRoots>${project.basedir}/src/main/java21</compileSourceRoots>
+                                    <multiReleaseOutput>true</multiReleaseOutput>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-jar-plugin</artifactId>
+                        <configuration>
+                            <archive>
+                                <manifestEntries>
+                                    <Multi-Release>true</Multi-Release>
+                                </manifestEntries>
+                            </archive>
+                        </configuration>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
 </project>
diff --git a/core/camel-support/src/main/java21/org/apache/camel/support/DefaultThreadPoolFactory.java b/core/camel-support/src/main/java21/org/apache/camel/support/DefaultThreadPoolFactory.java
new file mode 100644
index 00000000000..8fee4c9efd8
--- /dev/null
+++ b/core/camel-support/src/main/java21/org/apache/camel/support/DefaultThreadPoolFactory.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.support;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.StaticService;
+import org.apache.camel.spi.ThreadPoolFactory;
+import org.apache.camel.spi.ThreadPoolProfile;
+import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.util.concurrent.RejectableScheduledThreadPoolExecutor;
+import org.apache.camel.util.concurrent.RejectableThreadPoolExecutor;
+import org.apache.camel.util.concurrent.SizedScheduledExecutorService;
+import org.apache.camel.util.concurrent.ThreadType;
+import org.apache.camel.util.concurrent.ThreadFactoryTypeAware;
+
+/**
+ * Factory for thread pools that uses the JDK {@link Executors} for creating the thread pools.
+ */
+public class DefaultThreadPoolFactory extends ServiceSupport implements CamelContextAware, ThreadPoolFactory, StaticService {
+
+    private CamelContext camelContext;
+
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    @Override
+    public ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
+        return ThreadPoolFactoryType.from(threadFactory, Integer.MAX_VALUE).newCachedThreadPool(threadFactory);
+    }
+
+    @Override
+    public ExecutorService newThreadPool(ThreadPoolProfile profile, ThreadFactory factory) {
+        // allow core thread timeout is default true if not configured
+        boolean allow = profile.getAllowCoreThreadTimeOut() != null ? profile.getAllowCoreThreadTimeOut() : true;
+        return newThreadPool(profile.getPoolSize(),
+                profile.getMaxPoolSize(),
+                profile.getKeepAliveTime(),
+                profile.getTimeUnit(),
+                profile.getMaxQueueSize(),
+                allow,
+                profile.getRejectedExecutionHandler(),
+                factory);
+    }
+
+    public ExecutorService newThreadPool(
+            int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, int maxQueueSize,
+            boolean allowCoreThreadTimeOut,
+            RejectedExecutionHandler rejectedExecutionHandler, ThreadFactory threadFactory)
+            throws IllegalArgumentException {
+        // the core pool size must be 0 or higher
+        if (corePoolSize < 0) {
+            throw new IllegalArgumentException("CorePoolSize must be >= 0, was " + corePoolSize);
+        }
+
+        // validate max >= core
+        if (maxPoolSize < corePoolSize) {
+            throw new IllegalArgumentException(
+                    "MaxPoolSize must be >= corePoolSize, was " + maxPoolSize + " >= " + corePoolSize);
+        }
+        return ThreadPoolFactoryType.from(threadFactory, corePoolSize, maxPoolSize, maxQueueSize).newThreadPool(
+                corePoolSize, maxPoolSize, keepAliveTime, timeUnit, maxQueueSize, allowCoreThreadTimeOut,
+                rejectedExecutionHandler, threadFactory);
+    }
+
+    @Override
+    public ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) {
+        return ThreadPoolFactoryType.from(threadFactory, profile).newScheduledThreadPool(profile, threadFactory);
+    }
+
+    private enum ThreadPoolFactoryType {
+        PLATFORM {
+            ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
+                return Executors.newCachedThreadPool(threadFactory);
+            }
+
+            ExecutorService newThreadPool(
+                    int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, int maxQueueSize,
+                    boolean allowCoreThreadTimeOut,
+                    RejectedExecutionHandler rejectedExecutionHandler, ThreadFactory threadFactory)
+                    throws IllegalArgumentException {
+
+                BlockingQueue<Runnable> workQueue;
+                if (corePoolSize == 0 && maxQueueSize <= 0) {
+                    // use a synchronous queue for direct-handover (no tasks stored on the queue)
+                    workQueue = new SynchronousQueue<>();
+                    // and force 1 as pool size to be able to create the thread pool by the JDK
+                    corePoolSize = 1;
+                    maxPoolSize = 1;
+                } else if (maxQueueSize <= 0) {
+                    // use a synchronous queue for direct-handover (no tasks stored on the queue)
+                    workQueue = new SynchronousQueue<>();
+                } else {
+                    // bounded task queue to store tasks on the queue
+                    workQueue = new LinkedBlockingQueue<>(maxQueueSize);
+                }
+
+                ThreadPoolExecutor answer
+                        = new RejectableThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue);
+                answer.setThreadFactory(threadFactory);
+                answer.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
+                if (rejectedExecutionHandler == null) {
+                    rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
+                }
+                answer.setRejectedExecutionHandler(rejectedExecutionHandler);
+                return answer;
+            }
+
+            ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) {
+                RejectedExecutionHandler rejectedExecutionHandler = profile.getRejectedExecutionHandler();
+                if (rejectedExecutionHandler == null) {
+                    rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
+                }
+
+                ScheduledThreadPoolExecutor answer
+                        = new RejectableScheduledThreadPoolExecutor(profile.getPoolSize(), threadFactory, rejectedExecutionHandler);
+                answer.setRemoveOnCancelPolicy(true);
+
+                // need to wrap the thread pool in a sized to guard against the problem that the
+                // JDK created thread pool has an unbounded queue (see class javadoc), which mean
+                // we could potentially keep adding tasks, and run out of memory.
+                if (profile.getMaxPoolSize() > 0) {
+                    return new SizedScheduledExecutorService(answer, profile.getMaxQueueSize());
+                } else {
+                    return answer;
+                }
+            }
+        },
+        VIRTUAL {
+            @Override
+            ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
+                return Executors.newThreadPerTaskExecutor(threadFactory);
+            }
+
+            @Override
+            ExecutorService newThreadPool(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit,
+                                          int maxQueueSize, boolean allowCoreThreadTimeOut,
+                                          RejectedExecutionHandler rejectedExecutionHandler,
+                                          ThreadFactory threadFactory) throws IllegalArgumentException {
+                return Executors.newThreadPerTaskExecutor(threadFactory);
+            }
+
+            @Override
+            ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) {
+                return Executors.newScheduledThreadPool(0, threadFactory);
+            }
+        };
+
+        static ThreadPoolFactoryType from(ThreadFactory threadFactory, ThreadPoolProfile profile) {
+            return from(threadFactory, profile.getPoolSize(), profile.getMaxPoolSize(), profile.getMaxQueueSize());
+        }
+
+        static ThreadPoolFactoryType from(ThreadFactory threadFactory, int corePoolSize, int maxPoolSize, int maxQueueSize) {
+            return from(threadFactory, corePoolSize == 0 && maxQueueSize <= 0 ? 1 : maxPoolSize);
+        }
+
+        static ThreadPoolFactoryType from(ThreadFactory threadFactory, int maxPoolSize) {
+            if (ThreadType.current() == ThreadType.PLATFORM) {
+                return ThreadPoolFactoryType.PLATFORM;
+            }
+            return maxPoolSize > 1 && threadFactory instanceof ThreadFactoryTypeAware factoryTypeAware && factoryTypeAware.isVirtual() ?
+                    ThreadPoolFactoryType.VIRTUAL : ThreadPoolFactoryType.PLATFORM;
+        }
+
+        abstract ExecutorService newCachedThreadPool(ThreadFactory threadFactory);
+
+        abstract ExecutorService newThreadPool(
+                int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, int maxQueueSize,
+                boolean allowCoreThreadTimeOut,
+                RejectedExecutionHandler rejectedExecutionHandler, ThreadFactory threadFactory)
+                throws IllegalArgumentException;
+
+        abstract ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory);
+    }
+}
diff --git a/core/camel-util/pom.xml b/core/camel-util/pom.xml
index 2933fd9d9b8..c48ad466571 100644
--- a/core/camel-util/pom.xml
+++ b/core/camel-util/pom.xml
@@ -272,5 +272,52 @@
                 </plugins>
             </build>
         </profile>
+        <profile>
+            <id>java-21-sources</id>
+            <activation>
+                <jdk>[21,)</jdk>
+            </activation>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-compiler-plugin</artifactId>
+                        <version>${maven-compiler-plugin-version}</version>
+                        <executions>
+                            <execution>
+                                <id>default-compile</id>
+                                <phase>compile</phase>
+                                <goals>
+                                    <goal>compile</goal>
+                                </goals>
+                            </execution>
+                            <execution>
+                                <id>compile-java-21</id>
+                                <phase>compile</phase>
+                                <goals>
+                                    <goal>compile</goal>
+                                </goals>
+                                <configuration>
+                                    <release>21</release>
+                                    <compileSourceRoots>${project.basedir}/src/main/java21</compileSourceRoots>
+                                    <multiReleaseOutput>true</multiReleaseOutput>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-jar-plugin</artifactId>
+                        <configuration>
+                            <archive>
+                                <manifestEntries>
+                                    <Multi-Release>true</Multi-Release>
+                                </manifestEntries>
+                            </archive>
+                        </configuration>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
     </profiles>
 </project>
diff --git a/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadFactoryTypeAware.java b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadFactoryTypeAware.java
new file mode 100644
index 00000000000..39a7a398768
--- /dev/null
+++ b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadFactoryTypeAware.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * The interface indicating whether the created threads are virtual or not.
+ */
+public interface ThreadFactoryTypeAware extends ThreadFactory {
+
+    /**
+     * Indicates whether the created threads are virtual.
+     *
+     * @return {@code true} if the created threads are virtual, {@code false} if they are platform threads.
+     */
+    boolean isVirtual();
+}
diff --git a/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadType.java b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadType.java
new file mode 100644
index 00000000000..b048a2640c9
--- /dev/null
+++ b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadType.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+/**
+ * Defines the existing type of threads. The virtual threads can only be used with JDK 21+ and the system property
+ * {@code camel.threads.virtual.enabled} set to {@code true}.
+ */
+public enum ThreadType {
+    PLATFORM,
+    VIRTUAL;
+
+    public static ThreadType current() {
+        return PLATFORM;
+    }
+}
diff --git a/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/CamelThreadFactory.java b/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/CamelThreadFactory.java
new file mode 100644
index 00000000000..d6c54ddc692
--- /dev/null
+++ b/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/CamelThreadFactory.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Thread factory which creates threads supporting a naming pattern.
+ * The factory creates virtual threads in case the System property {@code camel.threads.virtual.enabled} set to
+ * {@code true}.
+ */
+public final class CamelThreadFactory implements ThreadFactoryTypeAware {
+    private static final Logger LOG = LoggerFactory.getLogger(CamelThreadFactory.class);
+
+    private static final ThreadFactoryType TYPE = ThreadFactoryType.current();
+
+    private final String pattern;
+    private final String name;
+    private final boolean daemon;
+    private final ThreadFactoryType threadType;
+
+    public CamelThreadFactory(String pattern, String name, boolean daemon) {
+        this.pattern = pattern;
+        this.name = name;
+        this.daemon = daemon;
+        this.threadType = daemon ? TYPE : ThreadFactoryType.PLATFORM;
+    }
+
+    @Override
+    public boolean isVirtual() {
+        return threadType == ThreadFactoryType.VIRTUAL;
+    }
+
+    @Override
+    public Thread newThread(Runnable runnable) {
+        String threadName = ThreadHelper.resolveThreadName(pattern, name);
+
+        Thread answer = threadType.newThread(threadName, daemon, runnable);
+
+        LOG.trace("Created thread[{}] -> {}", threadName, answer);
+        return answer;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    @Override
+    public String toString() {
+        return "CamelThreadFactory[" + name + "]";
+    }
+
+    private enum ThreadFactoryType {
+        PLATFORM {
+            Thread.Builder newThreadBuilder(String threadName, boolean daemon) {
+                return Thread.ofPlatform().name(threadName).daemon(daemon);
+            }
+        },
+        VIRTUAL {
+            Thread.Builder newThreadBuilder(String threadName, boolean daemon) {
+                return Thread.ofVirtual().name(threadName);
+            }
+        };
+
+        Thread newThread(String threadName, boolean daemon, Runnable runnable) {
+            return newThreadBuilder(threadName, daemon).unstarted(runnable);
+        }
+
+        abstract Thread.Builder newThreadBuilder(String threadName, boolean daemon);
+
+        static ThreadFactoryType current() {
+            return ThreadType.current() == ThreadType.VIRTUAL ? VIRTUAL : PLATFORM;
+        }
+    }
+}
+
diff --git a/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/ThreadType.java b/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/ThreadType.java
new file mode 100644
index 00000000000..0bc527ecf8e
--- /dev/null
+++ b/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/ThreadType.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Defines the existing type of threads. The virtual threads can only be used with JDK 21+ and the system property
+ * {@code camel.threads.virtual.enabled} set to {@code true}.
+ * The default value is {@code false} which means that platform threads are used by default.
+ */
+public enum ThreadType {
+    PLATFORM,
+    VIRTUAL;
+    private static final Logger LOG = LoggerFactory.getLogger(ThreadType.class);
+    private static final ThreadType CURRENT = Boolean.getBoolean("camel.threads.virtual.enabled") ? VIRTUAL : PLATFORM;
+    static {
+        LOG.info("The type of thread detected is {}", CURRENT);
+    }
+    public static ThreadType current() {
+        return CURRENT;
+    }
+}
diff --git a/docs/user-manual/modules/ROOT/pages/threading-model.adoc b/docs/user-manual/modules/ROOT/pages/threading-model.adoc
index b71194857be..a8782e808f3 100644
--- a/docs/user-manual/modules/ROOT/pages/threading-model.adoc
+++ b/docs/user-manual/modules/ROOT/pages/threading-model.adoc
@@ -241,3 +241,15 @@ you should implement and hook into the WorkManager.
 To hook in custom thread pool providers (e.g. for J2EE servers) a
 `ThreadPoolFactory` interface can be implemented. The implementation can
 be set in the `ExecutorServiceManager`.
+
+== Virtual Threads
+
+Starting from Java 21, the default `ThreadPoolFactory` can build `ExecutorService` and `ScheduledExecutorService` that
+use https://openjdk.org/jeps/425[virtual threads] instead of platform threads.
+But as it is an experimental feature, it is not enabled by default, you need to set the System property `camel.threads.virtual.enabled`
+to `true` and run Camel using Java 21 or above to enable it.
+
+Be aware that even if it is enabled, there are some use cases where platform threads are still used, for example, if the
+thread factory is configured to create non-daemon threads since virtual threads can only be daemons, or when the
+`ExecutorService` or `ScheduledExecutorService` to build cannot have more than one thread or finally when `corePoolSize`
+is set to zero and `maxQueueSize` is set to a value less or equal to `0`.