You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2023/12/01 17:13:35 UTC
(camel) 01/01: Add basic support of virtual threads
This is an automated email from the ASF dual-hosted git repository.
nfilotto pushed a commit to branch virtual-threads-basic-support
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 3c70161eb902b1238c728601783a62b41f564bf5
Author: Nicolas Filotto <nf...@talend.com>
AuthorDate: Fri Dec 1 18:13:12 2023 +0100
Add basic support of virtual threads
---
core/camel-support/pom.xml | 49 +++++
.../camel/support/DefaultThreadPoolFactory.java | 213 +++++++++++++++++++++
core/camel-util/pom.xml | 47 +++++
.../apache/camel/util/concurrent/ThreadType.java | 30 +++
.../camel/util/concurrent/ThreadTypeAware.java | 23 +++
.../camel/util/concurrent/CamelThreadFactory.java | 86 +++++++++
.../apache/camel/util/concurrent/ThreadType.java | 38 ++++
7 files changed, 486 insertions(+)
diff --git a/core/camel-support/pom.xml b/core/camel-support/pom.xml
index 32c6688c907..2ede683e498 100644
--- a/core/camel-support/pom.xml
+++ b/core/camel-support/pom.xml
@@ -85,4 +85,53 @@
</dependency>
</dependencies>
+ <profiles>
+ <profile>
+ <id>java-21-sources</id>
+ <activation>
+ <jdk>[21,)</jdk>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>${maven-compiler-plugin-version}</version>
+ <executions>
+ <execution>
+ <id>default-compile</id>
+ <phase>compile</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>compile-java-21</id>
+ <phase>compile</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ <configuration>
+ <release>21</release>
+ <compileSourceRoots>${project.basedir}/src/main/java21</compileSourceRoots>
+ <multiReleaseOutput>true</multiReleaseOutput>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <archive>
+ <manifestEntries>
+ <Multi-Release>true</Multi-Release>
+ </manifestEntries>
+ </archive>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
</project>
diff --git a/core/camel-support/src/main/java21/org/apache/camel/support/DefaultThreadPoolFactory.java b/core/camel-support/src/main/java21/org/apache/camel/support/DefaultThreadPoolFactory.java
new file mode 100644
index 00000000000..323216f2f9d
--- /dev/null
+++ b/core/camel-support/src/main/java21/org/apache/camel/support/DefaultThreadPoolFactory.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.support;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.StaticService;
+import org.apache.camel.spi.ThreadPoolFactory;
+import org.apache.camel.spi.ThreadPoolProfile;
+import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.util.concurrent.RejectableScheduledThreadPoolExecutor;
+import org.apache.camel.util.concurrent.RejectableThreadPoolExecutor;
+import org.apache.camel.util.concurrent.SizedScheduledExecutorService;
+import org.apache.camel.util.concurrent.ThreadType;
+import org.apache.camel.util.concurrent.ThreadTypeAware;
+
+/**
+ * Factory for thread pools that uses the JDK {@link Executors} for creating the thread pools.
+ */
+public class DefaultThreadPoolFactory extends ServiceSupport implements CamelContextAware, ThreadPoolFactory, StaticService, ThreadTypeAware {
+
+ private CamelContext camelContext;
+
+ @Override
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ @Override
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
+ @Override
+ public ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
+ return ThreadPoolFactoryType.from(threadFactory).newCachedThreadPool(threadFactory);
+ }
+
+ @Override
+ public ExecutorService newThreadPool(ThreadPoolProfile profile, ThreadFactory factory) {
+ return ThreadPoolFactoryType.from(factory).newThreadPool(profile, factory);
+ }
+
+ public ExecutorService newThreadPool(
+ int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, int maxQueueSize,
+ boolean allowCoreThreadTimeOut,
+ RejectedExecutionHandler rejectedExecutionHandler, ThreadFactory threadFactory)
+ throws IllegalArgumentException {
+
+ return ThreadPoolFactoryType.from(threadFactory).newThreadPool(
+ corePoolSize, maxPoolSize, keepAliveTime, timeUnit, maxQueueSize, allowCoreThreadTimeOut,
+ rejectedExecutionHandler, threadFactory);
+ }
+
+ @Override
+ public ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) {
+ return ThreadPoolFactoryType.from(threadFactory).newScheduledThreadPool(profile, threadFactory);
+ }
+
+ private enum ThreadPoolFactoryType {
+ PLATFORM {
+ ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
+ return Executors.newCachedThreadPool(threadFactory);
+ }
+
+ ExecutorService newThreadPool(ThreadPoolProfile profile, ThreadFactory factory) {
+ // allow core thread timeout is default true if not configured
+ boolean allow = profile.getAllowCoreThreadTimeOut() != null ? profile.getAllowCoreThreadTimeOut() : true;
+ return newThreadPool(profile.getPoolSize(),
+ profile.getMaxPoolSize(),
+ profile.getKeepAliveTime(),
+ profile.getTimeUnit(),
+ profile.getMaxQueueSize(),
+ allow,
+ profile.getRejectedExecutionHandler(),
+ factory);
+ }
+
+ ExecutorService newThreadPool(
+ int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, int maxQueueSize,
+ boolean allowCoreThreadTimeOut,
+ RejectedExecutionHandler rejectedExecutionHandler, ThreadFactory threadFactory)
+ throws IllegalArgumentException {
+
+ // the core pool size must be 0 or higher
+ if (corePoolSize < 0) {
+ throw new IllegalArgumentException("CorePoolSize must be >= 0, was " + corePoolSize);
+ }
+
+ // validate max >= core
+ if (maxPoolSize < corePoolSize) {
+ throw new IllegalArgumentException(
+ "MaxPoolSize must be >= corePoolSize, was " + maxPoolSize + " >= " + corePoolSize);
+ }
+
+ BlockingQueue<Runnable> workQueue;
+ if (corePoolSize == 0 && maxQueueSize <= 0) {
+ // use a synchronous queue for direct-handover (no tasks stored on the queue)
+ workQueue = new SynchronousQueue<>();
+ // and force 1 as pool size to be able to create the thread pool by the JDK
+ corePoolSize = 1;
+ maxPoolSize = 1;
+ } else if (maxQueueSize <= 0) {
+ // use a synchronous queue for direct-handover (no tasks stored on the queue)
+ workQueue = new SynchronousQueue<>();
+ } else {
+ // bounded task queue to store tasks on the queue
+ workQueue = new LinkedBlockingQueue<>(maxQueueSize);
+ }
+
+ ThreadPoolExecutor answer
+ = new RejectableThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue);
+ answer.setThreadFactory(threadFactory);
+ answer.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
+ if (rejectedExecutionHandler == null) {
+ rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
+ }
+ answer.setRejectedExecutionHandler(rejectedExecutionHandler);
+ return answer;
+ }
+
+ ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) {
+ RejectedExecutionHandler rejectedExecutionHandler = profile.getRejectedExecutionHandler();
+ if (rejectedExecutionHandler == null) {
+ rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
+ }
+
+ ScheduledThreadPoolExecutor answer
+ = new RejectableScheduledThreadPoolExecutor(profile.getPoolSize(), threadFactory, rejectedExecutionHandler);
+ answer.setRemoveOnCancelPolicy(true);
+
+ // need to wrap the thread pool in a sized to guard against the problem that the
+ // JDK created thread pool has an unbounded queue (see class javadoc), which mean
+ // we could potentially keep adding tasks, and run out of memory.
+ if (profile.getMaxPoolSize() > 0) {
+ return new SizedScheduledExecutorService(answer, profile.getMaxQueueSize());
+ } else {
+ return answer;
+ }
+ }
+ },
+ VIRTUAL {
+ @Override
+ ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
+ return Executors.newThreadPerTaskExecutor(threadFactory);
+ }
+
+ @Override
+ ExecutorService newThreadPool(ThreadPoolProfile profile, ThreadFactory factory) {
+ return Executors.newThreadPerTaskExecutor(factory);
+ }
+
+ @Override
+ ExecutorService newThreadPool(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit,
+ int maxQueueSize, boolean allowCoreThreadTimeOut,
+ RejectedExecutionHandler rejectedExecutionHandler,
+ ThreadFactory threadFactory) throws IllegalArgumentException {
+ return Executors.newThreadPerTaskExecutor(threadFactory);
+ }
+
+ @Override
+ ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) {
+ return Executors.newScheduledThreadPool(0, threadFactory);
+ }
+ };
+
+ static ThreadPoolFactoryType from(ThreadFactory threadFactory) {
+ if (ThreadType.current() == ThreadType.PLATFORM) {
+ return ThreadPoolFactoryType.PLATFORM;
+ }
+ return threadFactory instanceof ThreadTypeAware ?
+ ThreadPoolFactoryType.VIRTUAL : ThreadPoolFactoryType.PLATFORM;
+ }
+
+ abstract ExecutorService newCachedThreadPool(ThreadFactory threadFactory);
+
+ abstract ExecutorService newThreadPool(ThreadPoolProfile profile, ThreadFactory factory);
+
+ abstract ExecutorService newThreadPool(
+ int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, int maxQueueSize,
+ boolean allowCoreThreadTimeOut,
+ RejectedExecutionHandler rejectedExecutionHandler, ThreadFactory threadFactory)
+ throws IllegalArgumentException;
+
+ abstract ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory);
+ }
+}
diff --git a/core/camel-util/pom.xml b/core/camel-util/pom.xml
index 2933fd9d9b8..c48ad466571 100644
--- a/core/camel-util/pom.xml
+++ b/core/camel-util/pom.xml
@@ -272,5 +272,52 @@
</plugins>
</build>
</profile>
+ <profile>
+ <id>java-21-sources</id>
+ <activation>
+ <jdk>[21,)</jdk>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>${maven-compiler-plugin-version}</version>
+ <executions>
+ <execution>
+ <id>default-compile</id>
+ <phase>compile</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>compile-java-21</id>
+ <phase>compile</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ <configuration>
+ <release>21</release>
+ <compileSourceRoots>${project.basedir}/src/main/java21</compileSourceRoots>
+ <multiReleaseOutput>true</multiReleaseOutput>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <archive>
+ <manifestEntries>
+ <Multi-Release>true</Multi-Release>
+ </manifestEntries>
+ </archive>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
</profiles>
</project>
diff --git a/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadType.java b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadType.java
new file mode 100644
index 00000000000..b048a2640c9
--- /dev/null
+++ b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadType.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+/**
+ * Defines the existing type of threads. The virtual threads can only be used with JDK 21+ and the system property
+ * {@code camel.threads.virtual.enabled} set to {@code true}.
+ */
+public enum ThreadType {
+ PLATFORM,
+ VIRTUAL;
+
+ public static ThreadType current() {
+ return PLATFORM;
+ }
+}
diff --git a/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadTypeAware.java b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadTypeAware.java
new file mode 100644
index 00000000000..0148e7cdbe8
--- /dev/null
+++ b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadTypeAware.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+/**
+ * The marker interface indicating whether a given class is thread type aware.
+ */
+public interface ThreadTypeAware {
+}
diff --git a/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/CamelThreadFactory.java b/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/CamelThreadFactory.java
new file mode 100644
index 00000000000..51156065cb1
--- /dev/null
+++ b/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/CamelThreadFactory.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+import java.util.concurrent.ThreadFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Thread factory which creates threads supporting a naming pattern.
+ * The factory creates virtual threads in case the System property {@code camel.threads.virtual.enabled} set to
+ * {@code true}.
+ */
+public final class CamelThreadFactory implements ThreadFactory, ThreadTypeAware {
+ private static final Logger LOG = LoggerFactory.getLogger(CamelThreadFactory.class);
+
+ private static final ThreadFactoryType TYPE = ThreadFactoryType.current();
+
+ private final String pattern;
+ private final String name;
+ private final boolean daemon;
+
+ public CamelThreadFactory(String pattern, String name, boolean daemon) {
+ this.pattern = pattern;
+ this.name = name;
+ this.daemon = daemon;
+ }
+
+ @Override
+ public Thread newThread(Runnable runnable) {
+ String threadName = ThreadHelper.resolveThreadName(pattern, name);
+
+ Thread answer = TYPE.newThread(threadName, daemon, runnable);
+
+ LOG.trace("Created thread[{}] -> {}", threadName, answer);
+ return answer;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public String toString() {
+ return "CamelThreadFactory[" + name + "]";
+ }
+
+ private enum ThreadFactoryType {
+ PLATFORM {
+ Thread.Builder newThreadBuilder(String threadName, boolean daemon) {
+ return Thread.ofPlatform().name(threadName).daemon(daemon);
+ }
+ },
+ VIRTUAL {
+ Thread.Builder newThreadBuilder(String threadName, boolean daemon) {
+ return Thread.ofVirtual().name(threadName);
+ }
+ };
+
+ Thread newThread(String threadName, boolean daemon, Runnable runnable) {
+ return newThreadBuilder(threadName, daemon).unstarted(runnable);
+ }
+
+ abstract Thread.Builder newThreadBuilder(String threadName, boolean daemon);
+
+ static ThreadFactoryType current() {
+ return ThreadType.current() == ThreadType.VIRTUAL ? VIRTUAL : PLATFORM;
+ }
+ }
+}
+
diff --git a/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/ThreadType.java b/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/ThreadType.java
new file mode 100644
index 00000000000..0bc527ecf8e
--- /dev/null
+++ b/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/ThreadType.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Defines the existing type of threads. The virtual threads can only be used with JDK 21+ and the system property
+ * {@code camel.threads.virtual.enabled} set to {@code true}.
+ * The default value is {@code false} which means that platform threads are used by default.
+ */
+public enum ThreadType {
+ PLATFORM,
+ VIRTUAL;
+ private static final Logger LOG = LoggerFactory.getLogger(ThreadType.class);
+ private static final ThreadType CURRENT = Boolean.getBoolean("camel.threads.virtual.enabled") ? VIRTUAL : PLATFORM;
+ static {
+ LOG.info("The type of thread detected is {}", CURRENT);
+ }
+ public static ThreadType current() {
+ return CURRENT;
+ }
+}