You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2023/12/01 17:13:35 UTC

(camel) 01/01: Add basic support of virtual threads

This is an automated email from the ASF dual-hosted git repository.

nfilotto pushed a commit to branch virtual-threads-basic-support
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 3c70161eb902b1238c728601783a62b41f564bf5
Author: Nicolas Filotto <nf...@talend.com>
AuthorDate: Fri Dec 1 18:13:12 2023 +0100

    Add basic support of virtual threads
---
 core/camel-support/pom.xml                         |  49 +++++
 .../camel/support/DefaultThreadPoolFactory.java    | 213 +++++++++++++++++++++
 core/camel-util/pom.xml                            |  47 +++++
 .../apache/camel/util/concurrent/ThreadType.java   |  30 +++
 .../camel/util/concurrent/ThreadTypeAware.java     |  23 +++
 .../camel/util/concurrent/CamelThreadFactory.java  |  86 +++++++++
 .../apache/camel/util/concurrent/ThreadType.java   |  38 ++++
 7 files changed, 486 insertions(+)

diff --git a/core/camel-support/pom.xml b/core/camel-support/pom.xml
index 32c6688c907..2ede683e498 100644
--- a/core/camel-support/pom.xml
+++ b/core/camel-support/pom.xml
@@ -85,4 +85,53 @@
         </dependency>
     </dependencies>
 
+    <profiles>
+        <profile>
+            <id>java-21-sources</id>
+            <activation>
+                <jdk>[21,)</jdk>
+            </activation>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-compiler-plugin</artifactId>
+                        <version>${maven-compiler-plugin-version}</version>
+                        <executions>
+                            <execution>
+                                <id>default-compile</id>
+                                <phase>compile</phase>
+                                <goals>
+                                    <goal>compile</goal>
+                                </goals>
+                            </execution>
+                            <execution>
+                                <id>compile-java-21</id>
+                                <phase>compile</phase>
+                                <goals>
+                                    <goal>compile</goal>
+                                </goals>
+                                <configuration>
+                                    <release>21</release>
+                                    <compileSourceRoots>${project.basedir}/src/main/java21</compileSourceRoots>
+                                    <multiReleaseOutput>true</multiReleaseOutput>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-jar-plugin</artifactId>
+                        <configuration>
+                            <archive>
+                                <manifestEntries>
+                                    <Multi-Release>true</Multi-Release>
+                                </manifestEntries>
+                            </archive>
+                        </configuration>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
 </project>
diff --git a/core/camel-support/src/main/java21/org/apache/camel/support/DefaultThreadPoolFactory.java b/core/camel-support/src/main/java21/org/apache/camel/support/DefaultThreadPoolFactory.java
new file mode 100644
index 00000000000..323216f2f9d
--- /dev/null
+++ b/core/camel-support/src/main/java21/org/apache/camel/support/DefaultThreadPoolFactory.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.support;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.StaticService;
+import org.apache.camel.spi.ThreadPoolFactory;
+import org.apache.camel.spi.ThreadPoolProfile;
+import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.util.concurrent.RejectableScheduledThreadPoolExecutor;
+import org.apache.camel.util.concurrent.RejectableThreadPoolExecutor;
+import org.apache.camel.util.concurrent.SizedScheduledExecutorService;
+import org.apache.camel.util.concurrent.ThreadType;
+import org.apache.camel.util.concurrent.ThreadTypeAware;
+
+/**
+ * Factory for thread pools that uses the JDK {@link Executors} for creating the thread pools.
+ */
+public class DefaultThreadPoolFactory extends ServiceSupport implements CamelContextAware, ThreadPoolFactory, StaticService, ThreadTypeAware {
+
+    private CamelContext camelContext;
+
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    @Override
+    public ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
+        return ThreadPoolFactoryType.from(threadFactory).newCachedThreadPool(threadFactory);
+    }
+
+    @Override
+    public ExecutorService newThreadPool(ThreadPoolProfile profile, ThreadFactory factory) {
+        return ThreadPoolFactoryType.from(factory).newThreadPool(profile, factory);
+    }
+
+    public ExecutorService newThreadPool(
+            int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, int maxQueueSize,
+            boolean allowCoreThreadTimeOut,
+            RejectedExecutionHandler rejectedExecutionHandler, ThreadFactory threadFactory)
+            throws IllegalArgumentException {
+
+        return ThreadPoolFactoryType.from(threadFactory).newThreadPool(
+                corePoolSize, maxPoolSize, keepAliveTime, timeUnit, maxQueueSize, allowCoreThreadTimeOut,
+                rejectedExecutionHandler, threadFactory);
+    }
+
+    @Override
+    public ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) {
+        return ThreadPoolFactoryType.from(threadFactory).newScheduledThreadPool(profile, threadFactory);
+    }
+
+    private enum ThreadPoolFactoryType {
+        PLATFORM {
+            ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
+                return Executors.newCachedThreadPool(threadFactory);
+            }
+
+            ExecutorService newThreadPool(ThreadPoolProfile profile, ThreadFactory factory) {
+                // allow core thread timeout is default true if not configured
+                boolean allow = profile.getAllowCoreThreadTimeOut() != null ? profile.getAllowCoreThreadTimeOut() : true;
+                return newThreadPool(profile.getPoolSize(),
+                        profile.getMaxPoolSize(),
+                        profile.getKeepAliveTime(),
+                        profile.getTimeUnit(),
+                        profile.getMaxQueueSize(),
+                        allow,
+                        profile.getRejectedExecutionHandler(),
+                        factory);
+            }
+
+            ExecutorService newThreadPool(
+                    int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, int maxQueueSize,
+                    boolean allowCoreThreadTimeOut,
+                    RejectedExecutionHandler rejectedExecutionHandler, ThreadFactory threadFactory)
+                    throws IllegalArgumentException {
+
+                // the core pool size must be 0 or higher
+                if (corePoolSize < 0) {
+                    throw new IllegalArgumentException("CorePoolSize must be >= 0, was " + corePoolSize);
+                }
+
+                // validate max >= core
+                if (maxPoolSize < corePoolSize) {
+                    throw new IllegalArgumentException(
+                            "MaxPoolSize must be >= corePoolSize, was " + maxPoolSize + " >= " + corePoolSize);
+                }
+
+                BlockingQueue<Runnable> workQueue;
+                if (corePoolSize == 0 && maxQueueSize <= 0) {
+                    // use a synchronous queue for direct-handover (no tasks stored on the queue)
+                    workQueue = new SynchronousQueue<>();
+                    // and force 1 as pool size to be able to create the thread pool by the JDK
+                    corePoolSize = 1;
+                    maxPoolSize = 1;
+                } else if (maxQueueSize <= 0) {
+                    // use a synchronous queue for direct-handover (no tasks stored on the queue)
+                    workQueue = new SynchronousQueue<>();
+                } else {
+                    // bounded task queue to store tasks on the queue
+                    workQueue = new LinkedBlockingQueue<>(maxQueueSize);
+                }
+
+                ThreadPoolExecutor answer
+                        = new RejectableThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue);
+                answer.setThreadFactory(threadFactory);
+                answer.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
+                if (rejectedExecutionHandler == null) {
+                    rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
+                }
+                answer.setRejectedExecutionHandler(rejectedExecutionHandler);
+                return answer;
+            }
+
+            ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) {
+                RejectedExecutionHandler rejectedExecutionHandler = profile.getRejectedExecutionHandler();
+                if (rejectedExecutionHandler == null) {
+                    rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
+                }
+
+                ScheduledThreadPoolExecutor answer
+                        = new RejectableScheduledThreadPoolExecutor(profile.getPoolSize(), threadFactory, rejectedExecutionHandler);
+                answer.setRemoveOnCancelPolicy(true);
+
+                // need to wrap the thread pool in a sized to guard against the problem that the
+                // JDK created thread pool has an unbounded queue (see class javadoc), which mean
+                // we could potentially keep adding tasks, and run out of memory.
+                if (profile.getMaxPoolSize() > 0) {
+                    return new SizedScheduledExecutorService(answer, profile.getMaxQueueSize());
+                } else {
+                    return answer;
+                }
+            }
+        },
+        VIRTUAL {
+            @Override
+            ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
+                return Executors.newThreadPerTaskExecutor(threadFactory);
+            }
+
+            @Override
+            ExecutorService newThreadPool(ThreadPoolProfile profile, ThreadFactory factory) {
+                return Executors.newThreadPerTaskExecutor(factory);
+            }
+
+            @Override
+            ExecutorService newThreadPool(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit,
+                                          int maxQueueSize, boolean allowCoreThreadTimeOut,
+                                          RejectedExecutionHandler rejectedExecutionHandler,
+                                          ThreadFactory threadFactory) throws IllegalArgumentException {
+                return Executors.newThreadPerTaskExecutor(threadFactory);
+            }
+
+            @Override
+            ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) {
+                return Executors.newScheduledThreadPool(0, threadFactory);
+            }
+        };
+
+        static ThreadPoolFactoryType from(ThreadFactory threadFactory) {
+            if (ThreadType.current() == ThreadType.PLATFORM) {
+                return ThreadPoolFactoryType.PLATFORM;
+            }
+            return threadFactory instanceof ThreadTypeAware ?
+                    ThreadPoolFactoryType.VIRTUAL : ThreadPoolFactoryType.PLATFORM;
+        }
+
+        abstract ExecutorService newCachedThreadPool(ThreadFactory threadFactory);
+
+        abstract ExecutorService newThreadPool(ThreadPoolProfile profile, ThreadFactory factory);
+
+        abstract ExecutorService newThreadPool(
+                int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, int maxQueueSize,
+                boolean allowCoreThreadTimeOut,
+                RejectedExecutionHandler rejectedExecutionHandler, ThreadFactory threadFactory)
+                throws IllegalArgumentException;
+
+        abstract ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory);
+    }
+}
diff --git a/core/camel-util/pom.xml b/core/camel-util/pom.xml
index 2933fd9d9b8..c48ad466571 100644
--- a/core/camel-util/pom.xml
+++ b/core/camel-util/pom.xml
@@ -272,5 +272,52 @@
                 </plugins>
             </build>
         </profile>
+        <profile>
+            <id>java-21-sources</id>
+            <activation>
+                <jdk>[21,)</jdk>
+            </activation>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-compiler-plugin</artifactId>
+                        <version>${maven-compiler-plugin-version}</version>
+                        <executions>
+                            <execution>
+                                <id>default-compile</id>
+                                <phase>compile</phase>
+                                <goals>
+                                    <goal>compile</goal>
+                                </goals>
+                            </execution>
+                            <execution>
+                                <id>compile-java-21</id>
+                                <phase>compile</phase>
+                                <goals>
+                                    <goal>compile</goal>
+                                </goals>
+                                <configuration>
+                                    <release>21</release>
+                                    <compileSourceRoots>${project.basedir}/src/main/java21</compileSourceRoots>
+                                    <multiReleaseOutput>true</multiReleaseOutput>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-jar-plugin</artifactId>
+                        <configuration>
+                            <archive>
+                                <manifestEntries>
+                                    <Multi-Release>true</Multi-Release>
+                                </manifestEntries>
+                            </archive>
+                        </configuration>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
     </profiles>
 </project>
diff --git a/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadType.java b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadType.java
new file mode 100644
index 00000000000..b048a2640c9
--- /dev/null
+++ b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadType.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+/**
+ * Defines the existing type of threads. The virtual threads can only be used with JDK 21+ and the system property
+ * {@code camel.threads.virtual.enabled} set to {@code true}.
+ */
+public enum ThreadType {
+    PLATFORM,
+    VIRTUAL;
+
+    public static ThreadType current() {
+        return PLATFORM;
+    }
+}
diff --git a/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadTypeAware.java b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadTypeAware.java
new file mode 100644
index 00000000000..0148e7cdbe8
--- /dev/null
+++ b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadTypeAware.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+/**
+ * The marker interface indicating whether a given class is thread type aware.
+ */
+public interface ThreadTypeAware {
+}
diff --git a/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/CamelThreadFactory.java b/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/CamelThreadFactory.java
new file mode 100644
index 00000000000..51156065cb1
--- /dev/null
+++ b/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/CamelThreadFactory.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+import java.util.concurrent.ThreadFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Thread factory which creates threads supporting a naming pattern.
+ * The factory creates virtual threads in case the System property {@code camel.threads.virtual.enabled} set to
+ * {@code true}.
+ */
+public final class CamelThreadFactory implements ThreadFactory, ThreadTypeAware {
+    private static final Logger LOG = LoggerFactory.getLogger(CamelThreadFactory.class);
+
+    private static final ThreadFactoryType TYPE = ThreadFactoryType.current();
+
+    private final String pattern;
+    private final String name;
+    private final boolean daemon;
+
+    public CamelThreadFactory(String pattern, String name, boolean daemon) {
+        this.pattern = pattern;
+        this.name = name;
+        this.daemon = daemon;
+    }
+
+    @Override
+    public Thread newThread(Runnable runnable) {
+        String threadName = ThreadHelper.resolveThreadName(pattern, name);
+
+        Thread answer = TYPE.newThread(threadName, daemon, runnable);
+
+        LOG.trace("Created thread[{}] -> {}", threadName, answer);
+        return answer;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    @Override
+    public String toString() {
+        return "CamelThreadFactory[" + name + "]";
+    }
+
+    private enum ThreadFactoryType {
+        PLATFORM {
+            Thread.Builder newThreadBuilder(String threadName, boolean daemon) {
+                return Thread.ofPlatform().name(threadName).daemon(daemon);
+            }
+        },
+        VIRTUAL {
+            Thread.Builder newThreadBuilder(String threadName, boolean daemon) {
+                return Thread.ofVirtual().name(threadName);
+            }
+        };
+
+        Thread newThread(String threadName, boolean daemon, Runnable runnable) {
+            return newThreadBuilder(threadName, daemon).unstarted(runnable);
+        }
+
+        abstract Thread.Builder newThreadBuilder(String threadName, boolean daemon);
+
+        static ThreadFactoryType current() {
+            return ThreadType.current() == ThreadType.VIRTUAL ? VIRTUAL : PLATFORM;
+        }
+    }
+}
+
diff --git a/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/ThreadType.java b/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/ThreadType.java
new file mode 100644
index 00000000000..0bc527ecf8e
--- /dev/null
+++ b/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/ThreadType.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Defines the existing type of threads. The virtual threads can only be used with JDK 21+ and the system property
+ * {@code camel.threads.virtual.enabled} set to {@code true}.
+ * The default value is {@code false} which means that platform threads are used by default.
+ */
+public enum ThreadType {
+    PLATFORM,
+    VIRTUAL;
+    private static final Logger LOG = LoggerFactory.getLogger(ThreadType.class);
+    private static final ThreadType CURRENT = Boolean.getBoolean("camel.threads.virtual.enabled") ? VIRTUAL : PLATFORM;
+    static {
+        LOG.info("The type of thread detected is {}", CURRENT);
+    }
+    public static ThreadType current() {
+        return CURRENT;
+    }
+}