You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2023/12/01 17:13:34 UTC

(camel) branch virtual-threads-basic-support created (now 3c70161eb90)

This is an automated email from the ASF dual-hosted git repository.

nfilotto pushed a change to branch virtual-threads-basic-support
in repository https://gitbox.apache.org/repos/asf/camel.git


      at 3c70161eb90 Add basic support of virtual threads

This branch includes the following new commits:

     new 3c70161eb90 Add basic support of virtual threads

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



(camel) 01/01: Add basic support of virtual threads

Posted by nf...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nfilotto pushed a commit to branch virtual-threads-basic-support
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 3c70161eb902b1238c728601783a62b41f564bf5
Author: Nicolas Filotto <nf...@talend.com>
AuthorDate: Fri Dec 1 18:13:12 2023 +0100

    Add basic support of virtual threads
---
 core/camel-support/pom.xml                         |  49 +++++
 .../camel/support/DefaultThreadPoolFactory.java    | 213 +++++++++++++++++++++
 core/camel-util/pom.xml                            |  47 +++++
 .../apache/camel/util/concurrent/ThreadType.java   |  30 +++
 .../camel/util/concurrent/ThreadTypeAware.java     |  23 +++
 .../camel/util/concurrent/CamelThreadFactory.java  |  86 +++++++++
 .../apache/camel/util/concurrent/ThreadType.java   |  38 ++++
 7 files changed, 486 insertions(+)

diff --git a/core/camel-support/pom.xml b/core/camel-support/pom.xml
index 32c6688c907..2ede683e498 100644
--- a/core/camel-support/pom.xml
+++ b/core/camel-support/pom.xml
@@ -85,4 +85,53 @@
         </dependency>
     </dependencies>
 
+    <profiles>
+        <profile>
+            <id>java-21-sources</id>
+            <activation>
+                <jdk>[21,)</jdk>
+            </activation>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-compiler-plugin</artifactId>
+                        <version>${maven-compiler-plugin-version}</version>
+                        <executions>
+                            <execution>
+                                <id>default-compile</id>
+                                <phase>compile</phase>
+                                <goals>
+                                    <goal>compile</goal>
+                                </goals>
+                            </execution>
+                            <execution>
+                                <id>compile-java-21</id>
+                                <phase>compile</phase>
+                                <goals>
+                                    <goal>compile</goal>
+                                </goals>
+                                <configuration>
+                                    <release>21</release>
+                                    <compileSourceRoots>${project.basedir}/src/main/java21</compileSourceRoots>
+                                    <multiReleaseOutput>true</multiReleaseOutput>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-jar-plugin</artifactId>
+                        <configuration>
+                            <archive>
+                                <manifestEntries>
+                                    <Multi-Release>true</Multi-Release>
+                                </manifestEntries>
+                            </archive>
+                        </configuration>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
 </project>
diff --git a/core/camel-support/src/main/java21/org/apache/camel/support/DefaultThreadPoolFactory.java b/core/camel-support/src/main/java21/org/apache/camel/support/DefaultThreadPoolFactory.java
new file mode 100644
index 00000000000..323216f2f9d
--- /dev/null
+++ b/core/camel-support/src/main/java21/org/apache/camel/support/DefaultThreadPoolFactory.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.support;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.StaticService;
+import org.apache.camel.spi.ThreadPoolFactory;
+import org.apache.camel.spi.ThreadPoolProfile;
+import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.util.concurrent.RejectableScheduledThreadPoolExecutor;
+import org.apache.camel.util.concurrent.RejectableThreadPoolExecutor;
+import org.apache.camel.util.concurrent.SizedScheduledExecutorService;
+import org.apache.camel.util.concurrent.ThreadType;
+import org.apache.camel.util.concurrent.ThreadTypeAware;
+
+/**
+ * Factory for thread pools that uses the JDK {@link Executors} for creating the thread pools.
+ */
+public class DefaultThreadPoolFactory extends ServiceSupport implements CamelContextAware, ThreadPoolFactory, StaticService, ThreadTypeAware {
+
+    private CamelContext camelContext;
+
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    @Override
+    public ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
+        return ThreadPoolFactoryType.from(threadFactory).newCachedThreadPool(threadFactory);
+    }
+
+    @Override
+    public ExecutorService newThreadPool(ThreadPoolProfile profile, ThreadFactory factory) {
+        return ThreadPoolFactoryType.from(factory).newThreadPool(profile, factory);
+    }
+
+    public ExecutorService newThreadPool(
+            int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, int maxQueueSize,
+            boolean allowCoreThreadTimeOut,
+            RejectedExecutionHandler rejectedExecutionHandler, ThreadFactory threadFactory)
+            throws IllegalArgumentException {
+
+        return ThreadPoolFactoryType.from(threadFactory).newThreadPool(
+                corePoolSize, maxPoolSize, keepAliveTime, timeUnit, maxQueueSize, allowCoreThreadTimeOut,
+                rejectedExecutionHandler, threadFactory);
+    }
+
+    @Override
+    public ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) {
+        return ThreadPoolFactoryType.from(threadFactory).newScheduledThreadPool(profile, threadFactory);
+    }
+
+    private enum ThreadPoolFactoryType {
+        PLATFORM {
+            ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
+                return Executors.newCachedThreadPool(threadFactory);
+            }
+
+            ExecutorService newThreadPool(ThreadPoolProfile profile, ThreadFactory factory) {
+                // allow core thread timeout is default true if not configured
+                boolean allow = profile.getAllowCoreThreadTimeOut() != null ? profile.getAllowCoreThreadTimeOut() : true;
+                return newThreadPool(profile.getPoolSize(),
+                        profile.getMaxPoolSize(),
+                        profile.getKeepAliveTime(),
+                        profile.getTimeUnit(),
+                        profile.getMaxQueueSize(),
+                        allow,
+                        profile.getRejectedExecutionHandler(),
+                        factory);
+            }
+
+            ExecutorService newThreadPool(
+                    int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, int maxQueueSize,
+                    boolean allowCoreThreadTimeOut,
+                    RejectedExecutionHandler rejectedExecutionHandler, ThreadFactory threadFactory)
+                    throws IllegalArgumentException {
+
+                // the core pool size must be 0 or higher
+                if (corePoolSize < 0) {
+                    throw new IllegalArgumentException("CorePoolSize must be >= 0, was " + corePoolSize);
+                }
+
+                // validate max >= core
+                if (maxPoolSize < corePoolSize) {
+                    throw new IllegalArgumentException(
+                            "MaxPoolSize must be >= corePoolSize, was " + maxPoolSize + " >= " + corePoolSize);
+                }
+
+                BlockingQueue<Runnable> workQueue;
+                if (corePoolSize == 0 && maxQueueSize <= 0) {
+                    // use a synchronous queue for direct-handover (no tasks stored on the queue)
+                    workQueue = new SynchronousQueue<>();
+                    // and force 1 as pool size to be able to create the thread pool by the JDK
+                    corePoolSize = 1;
+                    maxPoolSize = 1;
+                } else if (maxQueueSize <= 0) {
+                    // use a synchronous queue for direct-handover (no tasks stored on the queue)
+                    workQueue = new SynchronousQueue<>();
+                } else {
+                    // bounded task queue to store tasks on the queue
+                    workQueue = new LinkedBlockingQueue<>(maxQueueSize);
+                }
+
+                ThreadPoolExecutor answer
+                        = new RejectableThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue);
+                answer.setThreadFactory(threadFactory);
+                answer.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
+                if (rejectedExecutionHandler == null) {
+                    rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
+                }
+                answer.setRejectedExecutionHandler(rejectedExecutionHandler);
+                return answer;
+            }
+
+            ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) {
+                RejectedExecutionHandler rejectedExecutionHandler = profile.getRejectedExecutionHandler();
+                if (rejectedExecutionHandler == null) {
+                    rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
+                }
+
+                ScheduledThreadPoolExecutor answer
+                        = new RejectableScheduledThreadPoolExecutor(profile.getPoolSize(), threadFactory, rejectedExecutionHandler);
+                answer.setRemoveOnCancelPolicy(true);
+
+                // need to wrap the thread pool in a sized to guard against the problem that the
+                // JDK created thread pool has an unbounded queue (see class javadoc), which mean
+                // we could potentially keep adding tasks, and run out of memory.
+                if (profile.getMaxPoolSize() > 0) {
+                    return new SizedScheduledExecutorService(answer, profile.getMaxQueueSize());
+                } else {
+                    return answer;
+                }
+            }
+        },
+        VIRTUAL {
+            @Override
+            ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
+                return Executors.newThreadPerTaskExecutor(threadFactory);
+            }
+
+            @Override
+            ExecutorService newThreadPool(ThreadPoolProfile profile, ThreadFactory factory) {
+                return Executors.newThreadPerTaskExecutor(factory);
+            }
+
+            @Override
+            ExecutorService newThreadPool(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit,
+                                          int maxQueueSize, boolean allowCoreThreadTimeOut,
+                                          RejectedExecutionHandler rejectedExecutionHandler,
+                                          ThreadFactory threadFactory) throws IllegalArgumentException {
+                return Executors.newThreadPerTaskExecutor(threadFactory);
+            }
+
+            @Override
+            ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) {
+                return Executors.newScheduledThreadPool(0, threadFactory);
+            }
+        };
+
+        static ThreadPoolFactoryType from(ThreadFactory threadFactory) {
+            if (ThreadType.current() == ThreadType.PLATFORM) {
+                return ThreadPoolFactoryType.PLATFORM;
+            }
+            return threadFactory instanceof ThreadTypeAware ?
+                    ThreadPoolFactoryType.VIRTUAL : ThreadPoolFactoryType.PLATFORM;
+        }
+
+        abstract ExecutorService newCachedThreadPool(ThreadFactory threadFactory);
+
+        abstract ExecutorService newThreadPool(ThreadPoolProfile profile, ThreadFactory factory);
+
+        abstract ExecutorService newThreadPool(
+                int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, int maxQueueSize,
+                boolean allowCoreThreadTimeOut,
+                RejectedExecutionHandler rejectedExecutionHandler, ThreadFactory threadFactory)
+                throws IllegalArgumentException;
+
+        abstract ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory);
+    }
+}
diff --git a/core/camel-util/pom.xml b/core/camel-util/pom.xml
index 2933fd9d9b8..c48ad466571 100644
--- a/core/camel-util/pom.xml
+++ b/core/camel-util/pom.xml
@@ -272,5 +272,52 @@
                 </plugins>
             </build>
         </profile>
+        <profile>
+            <id>java-21-sources</id>
+            <activation>
+                <jdk>[21,)</jdk>
+            </activation>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-compiler-plugin</artifactId>
+                        <version>${maven-compiler-plugin-version}</version>
+                        <executions>
+                            <execution>
+                                <id>default-compile</id>
+                                <phase>compile</phase>
+                                <goals>
+                                    <goal>compile</goal>
+                                </goals>
+                            </execution>
+                            <execution>
+                                <id>compile-java-21</id>
+                                <phase>compile</phase>
+                                <goals>
+                                    <goal>compile</goal>
+                                </goals>
+                                <configuration>
+                                    <release>21</release>
+                                    <compileSourceRoots>${project.basedir}/src/main/java21</compileSourceRoots>
+                                    <multiReleaseOutput>true</multiReleaseOutput>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-jar-plugin</artifactId>
+                        <configuration>
+                            <archive>
+                                <manifestEntries>
+                                    <Multi-Release>true</Multi-Release>
+                                </manifestEntries>
+                            </archive>
+                        </configuration>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
     </profiles>
 </project>
diff --git a/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadType.java b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadType.java
new file mode 100644
index 00000000000..b048a2640c9
--- /dev/null
+++ b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadType.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+/**
+ * Defines the existing type of threads. The virtual threads can only be used with JDK 21+ and the system property
+ * {@code camel.threads.virtual.enabled} set to {@code true}.
+ */
+public enum ThreadType {
+    PLATFORM,
+    VIRTUAL;
+
+    public static ThreadType current() {
+        return PLATFORM;
+    }
+}
diff --git a/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadTypeAware.java b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadTypeAware.java
new file mode 100644
index 00000000000..0148e7cdbe8
--- /dev/null
+++ b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadTypeAware.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+/**
+ * The marker interface indicating whether a given class is thread type aware.
+ */
+public interface ThreadTypeAware {
+}
diff --git a/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/CamelThreadFactory.java b/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/CamelThreadFactory.java
new file mode 100644
index 00000000000..51156065cb1
--- /dev/null
+++ b/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/CamelThreadFactory.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+import java.util.concurrent.ThreadFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Thread factory which creates threads supporting a naming pattern.
+ * The factory creates virtual threads in case the System property {@code camel.threads.virtual.enabled} set to
+ * {@code true}.
+ */
+public final class CamelThreadFactory implements ThreadFactory, ThreadTypeAware {
+    private static final Logger LOG = LoggerFactory.getLogger(CamelThreadFactory.class);
+
+    private static final ThreadFactoryType TYPE = ThreadFactoryType.current();
+
+    private final String pattern;
+    private final String name;
+    private final boolean daemon;
+
+    public CamelThreadFactory(String pattern, String name, boolean daemon) {
+        this.pattern = pattern;
+        this.name = name;
+        this.daemon = daemon;
+    }
+
+    @Override
+    public Thread newThread(Runnable runnable) {
+        String threadName = ThreadHelper.resolveThreadName(pattern, name);
+
+        Thread answer = TYPE.newThread(threadName, daemon, runnable);
+
+        LOG.trace("Created thread[{}] -> {}", threadName, answer);
+        return answer;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    @Override
+    public String toString() {
+        return "CamelThreadFactory[" + name + "]";
+    }
+
+    private enum ThreadFactoryType {
+        PLATFORM {
+            Thread.Builder newThreadBuilder(String threadName, boolean daemon) {
+                return Thread.ofPlatform().name(threadName).daemon(daemon);
+            }
+        },
+        VIRTUAL {
+            Thread.Builder newThreadBuilder(String threadName, boolean daemon) {
+                return Thread.ofVirtual().name(threadName);
+            }
+        };
+
+        Thread newThread(String threadName, boolean daemon, Runnable runnable) {
+            return newThreadBuilder(threadName, daemon).unstarted(runnable);
+        }
+
+        abstract Thread.Builder newThreadBuilder(String threadName, boolean daemon);
+
+        static ThreadFactoryType current() {
+            return ThreadType.current() == ThreadType.VIRTUAL ? VIRTUAL : PLATFORM;
+        }
+    }
+}
+
diff --git a/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/ThreadType.java b/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/ThreadType.java
new file mode 100644
index 00000000000..0bc527ecf8e
--- /dev/null
+++ b/core/camel-util/src/main/java21/org/apache/camel/util/concurrent/ThreadType.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Defines the existing type of threads. The virtual threads can only be used with JDK 21+ and the system property
+ * {@code camel.threads.virtual.enabled} set to {@code true}.
+ * The default value is {@code false} which means that platform threads are used by default.
+ */
+public enum ThreadType {
+    PLATFORM,
+    VIRTUAL;
+    private static final Logger LOG = LoggerFactory.getLogger(ThreadType.class);
+    private static final ThreadType CURRENT = Boolean.getBoolean("camel.threads.virtual.enabled") ? VIRTUAL : PLATFORM;
+    static {
+        LOG.info("The type of thread detected is {}", CURRENT);
+    }
+    public static ThreadType current() {
+        return CURRENT;
+    }
+}