You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/12/04 13:54:05 UTC

[camel] branch master updated: CAMEL-14247: camel-netty thread pool for consumer should not be fixed at 16 by default but take into account netty thread pool size and be bigger

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new ed419c9  CAMEL-14247: camel-netty thread pool for consumer should not be fixed at 16 by default but take into account netty thread pool size and be bigger
ed419c9 is described below

commit ed419c96a0010bf1a1221c46c9a9bb50ebc589b4
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Dec 4 14:53:23 2019 +0100

    CAMEL-14247: camel-netty thread pool for consumer should not be fixed at 16 by default but take into account netty thread pool size and be bigger
---
 .../src/main/docs/netty-http-component.adoc        |  4 +--
 .../camel-netty/src/main/docs/netty-component.adoc |  4 +--
 .../netty/DefaultServerInitializerFactory.java     |  2 --
 .../camel/component/netty/NettyComponent.java      | 39 +++++++++++-----------
 .../apache/camel/component/netty/NettyHelper.java  | 17 ++++++++++
 .../NettyHttpComponentConfiguration.java           | 12 +++++--
 .../springboot/NettyComponentConfiguration.java    | 12 +++++--
 7 files changed, 59 insertions(+), 31 deletions(-)

diff --git a/components/camel-netty-http/src/main/docs/netty-http-component.adoc b/components/camel-netty-http/src/main/docs/netty-http-component.adoc
index 79093e0..30a5ee5 100644
--- a/components/camel-netty-http/src/main/docs/netty-http-component.adoc
+++ b/components/camel-netty-http/src/main/docs/netty-http-component.adoc
@@ -104,8 +104,8 @@ The Netty HTTP component supports 11 options, which are listed below.
 | *headerFilterStrategy* (advanced) | To use a custom org.apache.camel.spi.HeaderFilterStrategy to filter headers. |  | HeaderFilterStrategy
 | *securityConfiguration* (security) | Refers to a org.apache.camel.component.netty.http.NettyHttpSecurityConfiguration for configuring secure web resources. |  | NettyHttpSecurityConfiguration
 | *useGlobalSslContext Parameters* (security) | Enable usage of global SSL context parameters. | false | boolean
-| *maximumPoolSize* (advanced) | The thread pool size for the EventExecutorGroup if its in use. The default value is 16. | 16 | int
-| *executorService* (advanced) | To use the given EventExecutorGroup. |  | EventExecutorGroup
+| *maximumPoolSize* (consumer) | Sets a maximum thread pool size for the netty consumer ordered thread pool. The default size is 2 x cpu core 1. Setting this value to eg 10 will then use 10 threads unless 2 x cpu core 1 is a higher value, which then will override and be used. For example if there are 8 cores, then the consumer thread pool will be 17. This thread pool is used to route messages received from Netty by Camel. We use a separate thread pool to ensure ordering of messages and a [...]
+| *executorService* (consumer) | To use the given EventExecutorGroup. |  | EventExecutorGroup
 | *sslContextParameters* (security) | To configure security using SSLContextParameters |  | SSLContextParameters
 | *basicPropertyBinding* (advanced) | Whether the component should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities | false | boolean
 | *lazyStartProducer* (producer) | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and [...]
diff --git a/components/camel-netty/src/main/docs/netty-component.adoc b/components/camel-netty/src/main/docs/netty-component.adoc
index cd23c1d..81ef113 100644
--- a/components/camel-netty/src/main/docs/netty-component.adoc
+++ b/components/camel-netty/src/main/docs/netty-component.adoc
@@ -61,9 +61,9 @@ The Netty component supports 8 options, which are listed below.
 [width="100%",cols="2,5,^1,2",options="header"]
 |===
 | Name | Description | Default | Type
-| *maximumPoolSize* (advanced) | The thread pool size for the EventExecutorGroup if its in use. The default value is 16. | 16 | int
+| *maximumPoolSize* (consumer) | Sets a maximum thread pool size for the netty consumer ordered thread pool. The default size is 2 x cpu core 1. Setting this value to eg 10 will then use 10 threads unless 2 x cpu core 1 is a higher value, which then will override and be used. For example if there are 8 cores, then the consumer thread pool will be 17. This thread pool is used to route messages received from Netty by Camel. We use a separate thread pool to ensure ordering of messages and a [...]
 | *configuration* (advanced) | To use the NettyConfiguration as configuration when creating endpoints. |  | NettyConfiguration
-| *executorService* (advanced) | To use the given EventExecutorGroup. |  | EventExecutorGroup
+| *executorService* (consumer) | To use the given EventExecutorGroup. |  | EventExecutorGroup
 | *useGlobalSslContext Parameters* (security) | Enable usage of global SSL context parameters. | false | boolean
 | *sslContextParameters* (security) | To configure security using SSLContextParameters |  | SSLContextParameters
 | *basicPropertyBinding* (advanced) | Whether the component should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities | false | boolean
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerInitializerFactory.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerInitializerFactory.java
index 31c3787..5489c6f 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerInitializerFactory.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerInitializerFactory.java
@@ -103,14 +103,12 @@ public class DefaultServerInitializerFactory extends ServerInitializerFactory {
             // Just use EventExecutorGroup from the Netty Component
             EventExecutorGroup applicationExecutor = consumer.getEndpoint().getComponent().getExecutorService();
             addToPipeline("handler", channelPipeline, applicationExecutor, new ServerChannelHandler(consumer));
-
         } else {
             // still use the worker event loop group here
             addToPipeline("handler", channelPipeline, new ServerChannelHandler(consumer));
 
         }
         LOG.trace("Created ChannelPipeline: {}", channelPipeline);
-
     }
 
     private void addToPipeline(String name, ChannelPipeline pipeline, ChannelHandler handler) {
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java
index a579f28..112e580 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java
@@ -19,10 +19,10 @@ package org.apache.camel.component.netty;
 import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.ThreadFactory;
 
-import io.netty.util.concurrent.DefaultEventExecutorGroup;
+import io.netty.util.NettyRuntime;
 import io.netty.util.concurrent.EventExecutorGroup;
+import io.netty.util.internal.SystemPropertyUtil;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.ExtendedCamelContext;
@@ -33,16 +33,15 @@ import org.apache.camel.spi.annotations.Component;
 import org.apache.camel.support.DefaultComponent;
 import org.apache.camel.support.PropertyBindingSupport;
 import org.apache.camel.support.jsse.SSLContextParameters;
-import org.apache.camel.util.concurrent.CamelThreadFactory;
 
 @Component("netty")
 public class NettyComponent extends DefaultComponent implements SSLContextParametersAware {
 
     @Metadata(label = "advanced")
     private NettyConfiguration configuration;
-    @Metadata(label = "advanced", defaultValue = "16")
-    private int maximumPoolSize = 16;
-    @Metadata(label = "advanced")
+    @Metadata(label = "consumer,advanced")
+    private int maximumPoolSize;
+    @Metadata(label = "consumer,advanced")
     private volatile EventExecutorGroup executorService;
     @Metadata(label = "security", defaultValue = "false")
     private boolean useGlobalSslContextParameters;
@@ -62,9 +61,14 @@ public class NettyComponent extends DefaultComponent implements SSLContextParame
     }
 
     /**
-     * The thread pool size for the EventExecutorGroup if its in use.
-     * <p/>
-     * The default value is 16.
+     * Sets a maximum thread pool size for the netty consumer ordered thread pool.
+     * The default size is 2 x cpu core + 1. Setting this value to eg 10 will then use 10 threads
+     * unless 2 x cpu core + 1 is a higher value, which then will override and be used. For example
+     * if there are 8 cores, then the consumer thread pool will be 17.
+     *
+     * This thread pool is used to route messages received from Netty by Camel.
+     * We use a separate thread pool to ensure ordering of messages and also in case some messages
+     * will block, then nettys worker threads (event loop) wont be affected.
      */
     public void setMaximumPoolSize(int maximumPoolSize) {
         this.maximumPoolSize = maximumPoolSize;
@@ -164,21 +168,18 @@ public class NettyComponent extends DefaultComponent implements SSLContextParame
 
         //Only setup the executorService if it is needed
         if (configuration.isUsingExecutorService() && executorService == null) {
-            executorService = createExecutorService();
+            int netty = SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2);
+            // we want one more thread than netty uses for its event loop
+            // and if there is a custom size for maximum pool size then use it, unless netty event loops has more threads
+            // and therefore we use math.max to find the highest value
+            int threads = Math.max(maximumPoolSize, netty + 1);
+            executorService = NettyHelper.createExecutorGroup(getCamelContext(), "NettyConsumerExecutorGroup", threads);
+            log.info("Creating shared NettyConsumerExecutorGroup with {} threads", threads);
         }
 
         super.doStart();
     }
 
-    protected EventExecutorGroup createExecutorService() {
-        // Provide the executor service for the application
-        // and use a Camel thread factory so we have consistent thread namings
-        // we should use a shared thread pool as recommended by Netty
-        String pattern = getCamelContext().getExecutorServiceManager().getThreadNamePattern();
-        ThreadFactory factory = new CamelThreadFactory(pattern, "NettyEventExecutorGroup", true);
-        return new DefaultEventExecutorGroup(getMaximumPoolSize(), factory);
-    }
-
     @Override
     protected void doStop() throws Exception {
         //Only shutdown the executorService if it is created by netty component
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java
index 9b6a39e..d240749 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java
@@ -18,13 +18,18 @@ package org.apache.camel.component.netty;
 
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
+import java.util.concurrent.ThreadFactory;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.DefaultAddressedEnvelope;
+import io.netty.util.concurrent.DefaultEventExecutorGroup;
+import io.netty.util.concurrent.EventExecutorGroup;
+import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.NoTypeConversionAvailableException;
+import org.apache.camel.util.concurrent.CamelThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -121,4 +126,16 @@ public final class NettyHelper {
         }
     }
 
+    /**
+     * Creates a {@link DefaultEventExecutorGroup} with the given name and maximum thread pool size.
+     */
+    public static EventExecutorGroup createExecutorGroup(CamelContext camelContext, String name, int threads) {
+        // Provide the executor service for the application
+        // and use a Camel thread factory so we have consistent thread namings
+        // we should use a shared thread pool as recommended by Netty
+        String pattern = camelContext.getExecutorServiceManager().getThreadNamePattern();
+        ThreadFactory factory = new CamelThreadFactory(pattern, name, true);
+        return new DefaultEventExecutorGroup(threads, factory);
+    }
+
 }
diff --git a/platforms/spring-boot/components-starter/camel-netty-http-starter/src/main/java/org/apache/camel/component/netty/http/springboot/NettyHttpComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-netty-http-starter/src/main/java/org/apache/camel/component/netty/http/springboot/NettyHttpComponentConfiguration.java
index c5f9743..5de25cd 100644
--- a/platforms/spring-boot/components-starter/camel-netty-http-starter/src/main/java/org/apache/camel/component/netty/http/springboot/NettyHttpComponentConfiguration.java
+++ b/platforms/spring-boot/components-starter/camel-netty-http-starter/src/main/java/org/apache/camel/component/netty/http/springboot/NettyHttpComponentConfiguration.java
@@ -65,10 +65,16 @@ public class NettyHttpComponentConfiguration
      */
     private Boolean useGlobalSslContextParameters = false;
     /**
-     * The thread pool size for the EventExecutorGroup if its in use. The
-     * default value is 16.
+     * Sets a maximum thread pool size for the netty consumer ordered thread
+     * pool. The default size is 2 x cpu core 1. Setting this value to eg 10
+     * will then use 10 threads unless 2 x cpu core 1 is a higher value, which
+     * then will override and be used. For example if there are 8 cores, then
+     * the consumer thread pool will be 17. This thread pool is used to route
+     * messages received from Netty by Camel. We use a separate thread pool to
+     * ensure ordering of messages and also in case some messages will block,
+     * then nettys worker threads (event loop) wont be affected.
      */
-    private Integer maximumPoolSize = 16;
+    private Integer maximumPoolSize;
     /**
      * To use the given EventExecutorGroup. The option is a
      * io.netty.util.concurrent.EventExecutorGroup type.
diff --git a/platforms/spring-boot/components-starter/camel-netty-starter/src/main/java/org/apache/camel/component/netty/springboot/NettyComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-netty-starter/src/main/java/org/apache/camel/component/netty/springboot/NettyComponentConfiguration.java
index 01fdf34..ff0b188 100644
--- a/platforms/spring-boot/components-starter/camel-netty-starter/src/main/java/org/apache/camel/component/netty/springboot/NettyComponentConfiguration.java
+++ b/platforms/spring-boot/components-starter/camel-netty-starter/src/main/java/org/apache/camel/component/netty/springboot/NettyComponentConfiguration.java
@@ -52,10 +52,16 @@ public class NettyComponentConfiguration
      */
     private Boolean enabled;
     /**
-     * The thread pool size for the EventExecutorGroup if its in use. The
-     * default value is 16.
+     * Sets a maximum thread pool size for the netty consumer ordered thread
+     * pool. The default size is 2 x cpu core 1. Setting this value to eg 10
+     * will then use 10 threads unless 2 x cpu core 1 is a higher value, which
+     * then will override and be used. For example if there are 8 cores, then
+     * the consumer thread pool will be 17. This thread pool is used to route
+     * messages received from Netty by Camel. We use a separate thread pool to
+     * ensure ordering of messages and also in case some messages will block,
+     * then nettys worker threads (event loop) wont be affected.
      */
-    private Integer maximumPoolSize = 16;
+    private Integer maximumPoolSize;
     /**
      * To use the NettyConfiguration as configuration when creating endpoints.
      */