You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2023/12/05 18:00:10 UTC
(camel) 01/01: CAMEL-20187: Add basic support of virtual threads
This is an automated email from the ASF dual-hosted git repository.
nfilotto pushed a commit to branch virtual-threads-basic-support
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 34f3abb8fbea9df27f69a9665c6bb6d41588626b
Author: Nicolas Filotto <nf...@talend.com>
AuthorDate: Fri Dec 1 18:13:12 2023 +0100
CAMEL-20187: Add basic support of virtual threads
---
Jenkinsfile.jdk21 | 3 +-
.../impl/DefaultExecutorServiceManagerTest.java | 3 +
.../camel/processor/WireTapAbortPolicyTest.java | 3 +
.../AggregateTimeoutWithNoExecutorServiceTest.java | 3 +
.../DualManagedThreadPoolProfileTest.java | 3 +
.../DualManagedThreadPoolWithIdTest.java | 3 +
.../management/ManagedRouteRemoveWireTapTest.java | 3 +
.../management/ManagedThreadPoolProfileTest.java | 3 +
.../camel/management/ManagedThreadPoolTest.java | 3 +
.../management/ManagedThreadPoolWithIdTest.java | 3 +
core/camel-support/pom.xml | 49 +++++
.../camel/support/DefaultThreadPoolFactory.java | 208 +++++++++++++++++++++
core/camel-util/pom.xml | 47 +++++
.../util/concurrent/ThreadFactoryTypeAware.java | 32 ++++
.../apache/camel/util/concurrent/ThreadType.java | 30 +++
.../camel/util/concurrent/CamelThreadFactory.java | 91 +++++++++
.../apache/camel/util/concurrent/ThreadType.java | 38 ++++
.../modules/ROOT/pages/threading-model.adoc | 12 ++
18 files changed, 536 insertions(+), 1 deletion(-)
diff --git a/Jenkinsfile.jdk21 b/Jenkinsfile.jdk21
index 5c25cfd3973..b4a902dcf0c 100644
--- a/Jenkinsfile.jdk21
+++ b/Jenkinsfile.jdk21
@@ -43,6 +43,7 @@ pipeline {
parameters {
booleanParam(name: 'CLEAN', defaultValue: true, description: 'Perform the build in clean workspace')
+ booleanParam(name: 'VIRTUAL_THREAD', defaultValue: false, description: 'Perform the build using virtual threads')
}
stages {
@@ -73,7 +74,7 @@ pipeline {
stage('Test') {
steps {
timeout(unit: 'HOURS', time: 7) {
- sh "./mvnw $MAVEN_PARAMS $MAVEN_TEST_PARAMS -Darchetype.test.skip -Dmaven.test.failure.ignore=true -Dcheckstyle.skip=true verify"
+ sh "./mvnw $MAVEN_PARAMS $MAVEN_TEST_PARAMS -Darchetype.test.skip -Dmaven.test.failure.ignore=true -Dcheckstyle.skip=true verify -Dcamel.threads.virtual.enabled=${params.VIRTUAL_THREAD}"
}
}
post {
diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java
index 3913414d050..8d3f98ef00b 100644
--- a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java
@@ -27,6 +27,7 @@ import org.apache.camel.util.concurrent.SizedScheduledExecutorService;
import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -37,6 +38,8 @@ import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+@DisabledIfSystemProperty(named = "camel.threads.virtual.enabled", matches = "true",
+ disabledReason = "In case of Virtual Threads, ThreadPerTaskExecutor is created instead of ThreadPoolExecutor")
public class DefaultExecutorServiceManagerTest extends ContextTestSupport {
@Test
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java
index 57260b4be7d..81b279f21dd 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java
@@ -30,6 +30,7 @@ import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import org.junit.jupiter.api.parallel.Isolated;
import static org.junit.jupiter.api.Assertions.fail;
@@ -37,6 +38,8 @@ import static org.junit.jupiter.api.Assertions.fail;
/**
* Wire tap unit test
*/
+@DisabledIfSystemProperty(named = "camel.threads.virtual.enabled", matches = "true",
+ disabledReason = "Tasks are not rejected when using Virtual Threads")
@Isolated
public class WireTapAbortPolicyTest extends ContextTestSupport {
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithNoExecutorServiceTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithNoExecutorServiceTest.java
index cc838dbeb30..624bf630d7f 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithNoExecutorServiceTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutWithNoExecutorServiceTest.java
@@ -21,9 +21,12 @@ import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import static org.junit.jupiter.api.Assertions.assertTrue;
+@DisabledIfSystemProperty(named = "camel.threads.virtual.enabled", matches = "true",
+ disabledReason = "In case of Virtual Threads, the threads cannot be counted this way")
public class AggregateTimeoutWithNoExecutorServiceTest extends ContextTestSupport {
@Test
diff --git a/core/camel-management/src/test/java/org/apache/camel/management/DualManagedThreadPoolProfileTest.java b/core/camel-management/src/test/java/org/apache/camel/management/DualManagedThreadPoolProfileTest.java
index a17468632cf..79cbfe181c6 100644
--- a/core/camel-management/src/test/java/org/apache/camel/management/DualManagedThreadPoolProfileTest.java
+++ b/core/camel-management/src/test/java/org/apache/camel/management/DualManagedThreadPoolProfileTest.java
@@ -23,12 +23,15 @@ import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.spi.ThreadPoolProfile;
import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
import static org.apache.camel.management.DefaultManagementObjectNameStrategy.TYPE_THREAD_POOL;
import static org.junit.jupiter.api.Assertions.assertEquals;
+@DisabledIfSystemProperty(named = "camel.threads.virtual.enabled", matches = "true",
+ disabledReason = "In case of Virtual Threads, the created thread pools don't have all these attributes")
@DisabledOnOs(OS.AIX)
public class DualManagedThreadPoolProfileTest extends ManagementTestSupport {
diff --git a/core/camel-management/src/test/java/org/apache/camel/management/DualManagedThreadPoolWithIdTest.java b/core/camel-management/src/test/java/org/apache/camel/management/DualManagedThreadPoolWithIdTest.java
index 6e1364609d7..71748076d8c 100644
--- a/core/camel-management/src/test/java/org/apache/camel/management/DualManagedThreadPoolWithIdTest.java
+++ b/core/camel-management/src/test/java/org/apache/camel/management/DualManagedThreadPoolWithIdTest.java
@@ -21,12 +21,15 @@ import javax.management.ObjectName;
import org.apache.camel.builder.RouteBuilder;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
import static org.apache.camel.management.DefaultManagementObjectNameStrategy.TYPE_THREAD_POOL;
import static org.junit.jupiter.api.Assertions.assertEquals;
+@DisabledIfSystemProperty(named = "camel.threads.virtual.enabled", matches = "true",
+ disabledReason = "In case of Virtual Threads, the created thread pools don't have all these attributes")
@DisabledOnOs(OS.AIX)
public class DualManagedThreadPoolWithIdTest extends ManagementTestSupport {
diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapTest.java
index 4bf24a0fe36..499a63d68cf 100644
--- a/core/camel-management/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapTest.java
+++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapTest.java
@@ -24,6 +24,7 @@ import javax.management.ObjectName;
import org.apache.camel.ServiceStatus;
import org.apache.camel.builder.RouteBuilder;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
@@ -32,6 +33,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
+@DisabledIfSystemProperty(named = "camel.threads.virtual.enabled", matches = "true",
+ disabledReason = "In case of Virtual Threads, the thread pools are not ThreadPoolExecutor")
@DisabledOnOs(OS.AIX)
public class ManagedRouteRemoveWireTapTest extends ManagementTestSupport {
diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolProfileTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolProfileTest.java
index e58414bf5bc..f3b9efee03d 100644
--- a/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolProfileTest.java
+++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolProfileTest.java
@@ -23,6 +23,7 @@ import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.spi.ThreadPoolProfile;
import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
@@ -30,6 +31,8 @@ import static org.apache.camel.management.DefaultManagementObjectNameStrategy.TY
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
+@DisabledIfSystemProperty(named = "camel.threads.virtual.enabled", matches = "true",
+ disabledReason = "In case of Virtual Threads, the created thread pools don't have all these attributes")
@DisabledOnOs(OS.AIX)
public class ManagedThreadPoolProfileTest extends ManagementTestSupport {
diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolTest.java
index 83cfc322b08..ce8be83dbb6 100644
--- a/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolTest.java
+++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolTest.java
@@ -28,12 +28,15 @@ import javax.management.ReflectionException;
import org.apache.camel.builder.RouteBuilder;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
import static org.apache.camel.management.DefaultManagementObjectNameStrategy.TYPE_THREAD_POOL;
import static org.junit.jupiter.api.Assertions.assertEquals;
+@DisabledIfSystemProperty(named = "camel.threads.virtual.enabled", matches = "true",
+ disabledReason = "In case of Virtual Threads, the created thread pools don't have all these attributes")
@DisabledOnOs(OS.AIX)
public class ManagedThreadPoolTest extends ManagementTestSupport {
diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolWithIdTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolWithIdTest.java
index de542a796ca..9116e20f8ef 100644
--- a/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolWithIdTest.java
+++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedThreadPoolWithIdTest.java
@@ -21,12 +21,15 @@ import javax.management.ObjectName;
import org.apache.camel.builder.RouteBuilder;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
import static org.apache.camel.management.DefaultManagementObjectNameStrategy.TYPE_THREAD_POOL;
import static org.junit.jupiter.api.Assertions.assertEquals;
+@DisabledIfSystemProperty(named = "camel.threads.virtual.enabled", matches = "true",
+ disabledReason = "In case of Virtual Threads, the created thread pools don't have all these attributes")
@DisabledOnOs(OS.AIX)
public class ManagedThreadPoolWithIdTest extends ManagementTestSupport {
diff --git a/core/camel-support/pom.xml b/core/camel-support/pom.xml
index 32c6688c907..2ede683e498 100644
--- a/core/camel-support/pom.xml
+++ b/core/camel-support/pom.xml
@@ -85,4 +85,53 @@
</dependency>
</dependencies>
+ <profiles>
+ <profile>
+ <id>java-21-sources</id>
+ <activation>
+ <jdk>[21,)</jdk>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>${maven-compiler-plugin-version}</version>
+ <executions>
+ <execution>
+ <id>default-compile</id>
+ <phase>compile</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>compile-java-21</id>
+ <phase>compile</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ <configuration>
+ <release>21</release>
+ <compileSourceRoots>${project.basedir}/src/main/java21</compileSourceRoots>
+ <multiReleaseOutput>true</multiReleaseOutput>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <archive>
+ <manifestEntries>
+ <Multi-Release>true</Multi-Release>
+ </manifestEntries>
+ </archive>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
</project>
diff --git a/core/camel-support/src/main/java21/org/apache/camel/support/DefaultThreadPoolFactory.java b/core/camel-support/src/main/java21/org/apache/camel/support/DefaultThreadPoolFactory.java
new file mode 100644
index 00000000000..8fee4c9efd8
--- /dev/null
+++ b/core/camel-support/src/main/java21/org/apache/camel/support/DefaultThreadPoolFactory.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.support;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.StaticService;
+import org.apache.camel.spi.ThreadPoolFactory;
+import org.apache.camel.spi.ThreadPoolProfile;
+import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.util.concurrent.RejectableScheduledThreadPoolExecutor;
+import org.apache.camel.util.concurrent.RejectableThreadPoolExecutor;
+import org.apache.camel.util.concurrent.SizedScheduledExecutorService;
+import org.apache.camel.util.concurrent.ThreadType;
+import org.apache.camel.util.concurrent.ThreadFactoryTypeAware;
+
+/**
+ * Factory for thread pools that uses the JDK {@link Executors} for creating the thread pools.
+ */
+public class DefaultThreadPoolFactory extends ServiceSupport implements CamelContextAware, ThreadPoolFactory, StaticService {
+
+ private CamelContext camelContext;
+
+ @Override
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ @Override
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
+ @Override
+ public ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
+ return ThreadPoolFactoryType.from(threadFactory, Integer.MAX_VALUE).newCachedThreadPool(threadFactory);
+ }
+
+ @Override
+ public ExecutorService newThreadPool(ThreadPoolProfile profile, ThreadFactory factory) {
+ // allow core thread timeout is default true if not configured
+ boolean allow = profile.getAllowCoreThreadTimeOut() != null ? profile.getAllowCoreThreadTimeOut() : true;
+ return newThreadPool(profile.getPoolSize(),
+ profile.getMaxPoolSize(),
+ profile.getKeepAliveTime(),
+ profile.getTimeUnit(),
+ profile.getMaxQueueSize(),
+ allow,
+ profile.getRejectedExecutionHandler(),
+ factory);
+ }
+
+ public ExecutorService newThreadPool(
+ int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, int maxQueueSize,
+ boolean allowCoreThreadTimeOut,
+ RejectedExecutionHandler rejectedExecutionHandler, ThreadFactory threadFactory)
+ throws IllegalArgumentException {
+ // the core pool size must be 0 or higher
+ if (corePoolSize < 0) {
+ throw new IllegalArgumentException("CorePoolSize must be >= 0, was " + corePoolSize);
+ }
+
+ // validate max >= core
+ if (maxPoolSize < corePoolSize) {
+ throw new IllegalArgumentException(
+ "MaxPoolSize must be >= corePoolSize, was " + maxPoolSize + " >= " + corePoolSize);
+ }
+ return ThreadPoolFactoryType.from(threadFactory, corePoolSize, maxPoolSize, maxQueueSize).newThreadPool(
+ corePoolSize, maxPoolSize, keepAliveTime, timeUnit, maxQueueSize, allowCoreThreadTimeOut,
+ rejectedExecutionHandler, threadFactory);
+ }
+
+ @Override
+ public ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) {
+ return ThreadPoolFactoryType.from(threadFactory, profile).newScheduledThreadPool(profile, threadFactory);
+ }
+
+ private enum ThreadPoolFactoryType {
+ PLATFORM {
+ ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
+ return Executors.newCachedThreadPool(threadFactory);
+ }
+
+ ExecutorService newThreadPool(
+ int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, int maxQueueSize,
+ boolean allowCoreThreadTimeOut,
+ RejectedExecutionHandler rejectedExecutionHandler, ThreadFactory threadFactory)
+ throws IllegalArgumentException {
+
+ BlockingQueue<Runnable> workQueue;
+ if (corePoolSize == 0 && maxQueueSize <= 0) {
+ // use a synchronous queue for direct-handover (no tasks stored on the queue)
+ workQueue = new SynchronousQueue<>();
+ // and force 1 as pool size to be able to create the thread pool by the JDK
+ corePoolSize = 1;
+ maxPoolSize = 1;
+ } else if (maxQueueSize <= 0) {
+ // use a synchronous queue for direct-handover (no tasks stored on the queue)
+ workQueue = new SynchronousQueue<>();
+ } else {
+ // bounded task queue to store tasks on the queue
+ workQueue = new LinkedBlockingQueue<>(maxQueueSize);
+ }
+
+ ThreadPoolExecutor answer
+ = new RejectableThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue);
+ answer.setThreadFactory(threadFactory);
+ answer.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
+ if (rejectedExecutionHandler == null) {
+ rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
+ }
+ answer.setRejectedExecutionHandler(rejectedExecutionHandler);
+ return answer;
+ }
+
+ ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) {
+ RejectedExecutionHandler rejectedExecutionHandler = profile.getRejectedExecutionHandler();
+ if (rejectedExecutionHandler == null) {
+ rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
+ }
+
+ ScheduledThreadPoolExecutor answer
+ = new RejectableScheduledThreadPoolExecutor(profile.getPoolSize(), threadFactory, rejectedExecutionHandler);
+ answer.setRemoveOnCancelPolicy(true);
+
+ // need to wrap the thread pool in a sized to guard against the problem that the
+ // JDK created thread pool has an unbounded queue (see class javadoc), which mean
+ // we could potentially keep adding tasks, and run out of memory.
+ if (profile.getMaxPoolSize() > 0) {
+ return new SizedScheduledExecutorService(answer, profile.getMaxQueueSize());
+ } else {
+ return answer;
+ }
+ }
+ },
+ VIRTUAL {
+ @Override
+ ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
+ return Executors.newThreadPerTaskExecutor(threadFactory);
+ }
+
+ @Override
+ ExecutorService newThreadPool(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit,
+ int maxQueueSize, boolean allowCoreThreadTimeOut,
+ RejectedExecutionHandler rejectedExecutionHandler,
+ ThreadFactory threadFactory) throws IllegalArgumentException {
+ return Executors.newThreadPerTaskExecutor(threadFactory);
+ }
+
+ @Override
+ ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) {
+ return Executors.newScheduledThreadPool(0, threadFactory);
+ }
+ };
+
+ static ThreadPoolFactoryType from(ThreadFactory threadFactory, ThreadPoolProfile profile) {
+ return from(threadFactory, profile.getPoolSize(), profile.getMaxPoolSize(), profile.getMaxQueueSize());
+ }
+
+ static ThreadPoolFactoryType from(ThreadFactory threadFactory, int corePoolSize, int maxPoolSize, int maxQueueSize) {
+ return from(threadFactory, corePoolSize == 0 && maxQueueSize <= 0 ? 1 : maxPoolSize);
+ }
+
+ static ThreadPoolFactoryType from(ThreadFactory threadFactory, int maxPoolSize) {
+ if (ThreadType.current() == ThreadType.PLATFORM) {
+ return ThreadPoolFactoryType.PLATFORM;
+ }
+ return maxPoolSize > 1 && threadFactory instanceof ThreadFactoryTypeAware factoryTypeAware && factoryTypeAware.isVirtual() ?
+ ThreadPoolFactoryType.VIRTUAL : ThreadPoolFactoryType.PLATFORM;
+ }
+
+ abstract ExecutorService newCachedThreadPool(ThreadFactory threadFactory);
+
+ abstract ExecutorService newThreadPool(
+ int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, int maxQueueSize,
+ boolean allowCoreThreadTimeOut,
+ RejectedExecutionHandler rejectedExecutionHandler, ThreadFactory threadFactory)
+ throws IllegalArgumentException;
+
+ abstract ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory);
+ }
+}
diff --git a/core/camel-util/pom.xml b/core/camel-util/pom.xml
index 2933fd9d9b8..c48ad466571 100644
--- a/core/camel-util/pom.xml
+++ b/core/camel-util/pom.xml
@@ -272,5 +272,52 @@
</plugins>
</build>
</profile>
+ <profile>
+ <id>java-21-sources</id>
+ <activation>
+ <jdk>[21,)</jdk>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>${maven-compiler-plugin-version}</version>
+ <executions>
+ <execution>
+ <id>default-compile</id>
+ <phase>compile</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>compile-java-21</id>
+ <phase>compile</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ <configuration>
+ <release>21</release>
+ <compileSourceRoots>${project.basedir}/src/main/java21</compileSourceRoots>
+ <multiReleaseOutput>true</multiReleaseOutput>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <archive>
+ <manifestEntries>
+ <Multi-Release>true</Multi-Release>
+ </manifestEntries>
+ </archive>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
</profiles>
</project>
diff --git a/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadFactoryTypeAware.java b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadFactoryTypeAware.java
new file mode 100644
index 00000000000..39a7a398768
--- /dev/null
+++ b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadFactoryTypeAware.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * The interface indicating whether the created threads are virtual or not.
+ */
+public interface ThreadFactoryTypeAware extends ThreadFactory {
+
+ /**
+ * Indicates whether the created threads are virtual.
+ *
+ * @return {@code true} if the created threads are virtual, {@code false} if they are platform threads.
+ */
+ boolean isVirtual();
+}
diff --git a/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadType.java b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadType.java
new file mode 100644
index 00000000000..b048a2640c9
--- /dev/null
+++ b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadType.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+/**
+ * Defines the existing type of threads. The virtual threads can only be used with JDK 21+ and the system property
+ * {@code camel.threads.virtual.enabled} set to {@code true}.
+ */
+public enum ThreadType {
+ PLATFORM,
+ VIRTUAL;
+
+ public static ThreadType current() {
+ return PLATFORM;
+ }
+}
diff --git a/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/CamelThreadFactory.java b/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/CamelThreadFactory.java
new file mode 100644
index 00000000000..d6c54ddc692
--- /dev/null
+++ b/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/CamelThreadFactory.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Thread factory which creates threads supporting a naming pattern.
+ * The factory creates virtual threads in case the System property {@code camel.threads.virtual.enabled} set to
+ * {@code true}.
+ */
+public final class CamelThreadFactory implements ThreadFactoryTypeAware {
+ private static final Logger LOG = LoggerFactory.getLogger(CamelThreadFactory.class);
+
+ private static final ThreadFactoryType TYPE = ThreadFactoryType.current();
+
+ private final String pattern;
+ private final String name;
+ private final boolean daemon;
+ private final ThreadFactoryType threadType;
+
+ public CamelThreadFactory(String pattern, String name, boolean daemon) {
+ this.pattern = pattern;
+ this.name = name;
+ this.daemon = daemon;
+ this.threadType = daemon ? TYPE : ThreadFactoryType.PLATFORM;
+ }
+
+ @Override
+ public boolean isVirtual() {
+ return threadType == ThreadFactoryType.VIRTUAL;
+ }
+
+ @Override
+ public Thread newThread(Runnable runnable) {
+ String threadName = ThreadHelper.resolveThreadName(pattern, name);
+
+ Thread answer = threadType.newThread(threadName, daemon, runnable);
+
+ LOG.trace("Created thread[{}] -> {}", threadName, answer);
+ return answer;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public String toString() {
+ return "CamelThreadFactory[" + name + "]";
+ }
+
+ private enum ThreadFactoryType {
+ PLATFORM {
+ Thread.Builder newThreadBuilder(String threadName, boolean daemon) {
+ return Thread.ofPlatform().name(threadName).daemon(daemon);
+ }
+ },
+ VIRTUAL {
+ Thread.Builder newThreadBuilder(String threadName, boolean daemon) {
+ return Thread.ofVirtual().name(threadName);
+ }
+ };
+
+ Thread newThread(String threadName, boolean daemon, Runnable runnable) {
+ return newThreadBuilder(threadName, daemon).unstarted(runnable);
+ }
+
+ abstract Thread.Builder newThreadBuilder(String threadName, boolean daemon);
+
+ static ThreadFactoryType current() {
+ return ThreadType.current() == ThreadType.VIRTUAL ? VIRTUAL : PLATFORM;
+ }
+ }
+}
+
diff --git a/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/ThreadType.java b/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/ThreadType.java
new file mode 100644
index 00000000000..0bc527ecf8e
--- /dev/null
+++ b/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/ThreadType.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Defines the existing type of threads. The virtual threads can only be used with JDK 21+ and the system property
+ * {@code camel.threads.virtual.enabled} set to {@code true}.
+ * The default value is {@code false} which means that platform threads are used by default.
+ */
+public enum ThreadType {
+ PLATFORM,
+ VIRTUAL;
+ private static final Logger LOG = LoggerFactory.getLogger(ThreadType.class);
+ private static final ThreadType CURRENT = Boolean.getBoolean("camel.threads.virtual.enabled") ? VIRTUAL : PLATFORM;
+ static {
+ LOG.info("The type of thread detected is {}", CURRENT);
+ }
+ public static ThreadType current() {
+ return CURRENT;
+ }
+}
diff --git a/docs/user-manual/modules/ROOT/pages/threading-model.adoc b/docs/user-manual/modules/ROOT/pages/threading-model.adoc
index b71194857be..a8782e808f3 100644
--- a/docs/user-manual/modules/ROOT/pages/threading-model.adoc
+++ b/docs/user-manual/modules/ROOT/pages/threading-model.adoc
@@ -241,3 +241,15 @@ you should implement and hook into the WorkManager.
To hook in custom thread pool providers (e.g. for J2EE servers) a
`ThreadPoolFactory` interface can be implemented. The implementation can
be set in the `ExecutorServiceManager`.
+
+== Virtual Threads
+
+Starting from Java 21, the default `ThreadPoolFactory` can build `ExecutorService` and `ScheduledExecutorService` that
+use https://openjdk.org/jeps/425[virtual threads] instead of platform threads.
+But as it is an experimental feature, it is not enabled by default, you need to set the System property `camel.threads.virtual.enabled`
+to `true` and run Camel using Java 21 or above to enable it.
+
+Be aware that even if it is enabled, there are some use cases where platform threads are still used, for example, if the
+thread factory is configured to create non-daemon threads since virtual threads can only be daemons, or when the
+`ExecutorService` or `ScheduledExecutorService` to build cannot have more than one thread or finally when `corePoolSize`
+is set to zero and `maxQueueSize` is set to a value less or equal to `0`.