You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/07/09 15:28:00 UTC

git commit: CAMEL-6522: Added bossPool and workerPool options to Netty consumer to allow sharing thread pools more easily.

Updated Branches:
  refs/heads/master 371efe7e0 -> 4664d64c6


CAMEL-6522: Added bossPool and workerPool options to Netty consumer to allow sharing thread pools more easily.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4664d64c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4664d64c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4664d64c

Branch: refs/heads/master
Commit: 4664d64c69840637b564b5f65c5ec0f026e46c10
Parents: 371efe7
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Jul 9 15:07:12 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Jul 9 15:27:42 2013 +0200

----------------------------------------------------------------------
 .../netty/CamelNettyThreadNameDeterminer.java   | 36 +++++++
 .../component/netty/NettyBossPoolBuilder.java   | 67 +++++++++++++
 .../camel/component/netty/NettyHelper.java      |  1 +
 .../NettyServerBootstrapConfiguration.java      | 39 ++++++++
 .../component/netty/NettyWorkerPoolBuilder.java | 80 ++++++++++++++++
 .../SingleTCPNettyServerBootstrapFactory.java   | 61 ++++++------
 .../SingleUDPNettyServerBootstrapFactory.java   | 31 +++---
 ...UseSharedWorkerThreadPoolManyRoutesTest.java | 84 +++++++++++++++++
 .../NettyUseSharedWorkerThreadPoolTest.java     | 99 ++++++++++++++++++++
 ...pringNettyUseSharedWorkerThreadPoolTest.java | 51 ++++++++++
 .../src/test/resources/log4j.properties         |  2 +-
 ...SpringNettyUseSharedWorkerThreadPoolTest.xml | 52 ++++++++++
 12 files changed, 553 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/4664d64c/components/camel-netty/src/main/java/org/apache/camel/component/netty/CamelNettyThreadNameDeterminer.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/CamelNettyThreadNameDeterminer.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/CamelNettyThreadNameDeterminer.java
new file mode 100644
index 0000000..c18ce69
--- /dev/null
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/CamelNettyThreadNameDeterminer.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.apache.camel.util.concurrent.ThreadHelper;
+import org.jboss.netty.util.ThreadNameDeterminer;
+
+public class CamelNettyThreadNameDeterminer implements ThreadNameDeterminer {
+
+    private final String pattern;
+    private final String name;
+
+    public CamelNettyThreadNameDeterminer(String pattern, String name) {
+        this.pattern = pattern;
+        this.name = name;
+    }
+
+    @Override
+    public String determineThreadName(String currentThreadName, String proposedThreadName) throws Exception {
+        return ThreadHelper.resolveThreadName(pattern, name);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/4664d64c/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyBossPoolBuilder.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyBossPoolBuilder.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyBossPoolBuilder.java
new file mode 100644
index 0000000..13c0876
--- /dev/null
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyBossPoolBuilder.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import java.util.concurrent.Executors;
+
+import org.jboss.netty.channel.socket.nio.BossPool;
+import org.jboss.netty.channel.socket.nio.NioServerBossPool;
+
+/**
+ * A builder to create Netty {@link org.jboss.netty.channel.socket.nio.BossPool} which can be used for sharing boos pools
+ * with multiple Netty {@link org.apache.camel.component.netty.NettyServerBootstrapFactory} server bootstrap configurations.
+ */
+public final class NettyBossPoolBuilder {
+
+    private String name = "NettyBoss";
+    private String pattern;
+    private int bossCount = 1;
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public void setPattern(String pattern) {
+        this.pattern = pattern;
+    }
+
+    public void setBossCount(int bossCount) {
+        this.bossCount = bossCount;
+    }
+
+    public NettyBossPoolBuilder withName(String name) {
+        setName(name);
+        return this;
+    }
+
+    public NettyBossPoolBuilder withPattern(String pattern) {
+        setPattern(pattern);
+        return this;
+    }
+
+    public NettyBossPoolBuilder withBossCount(int bossCount) {
+        setBossCount(bossCount);
+        return this;
+    }
+
+    /**
+     * Creates a new boss pool.
+     */
+    BossPool build() {
+        return new NioServerBossPool(Executors.newCachedThreadPool(), bossCount, new CamelNettyThreadNameDeterminer(pattern, name));
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/4664d64c/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java
index 36d325a..05f3e4d 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
  */
 public final class NettyHelper {
 
+    public static final int DEFAULT_IO_THREADS = Runtime.getRuntime().availableProcessors() * 2;
     private static final Logger LOG = LoggerFactory.getLogger(NettyHelper.class);
 
     private NettyHelper() {

http://git-wip-us.apache.org/repos/asf/camel/blob/4664d64c/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapConfiguration.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapConfiguration.java
index 6ea9d9e..7df49b0 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapConfiguration.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapConfiguration.java
@@ -18,8 +18,11 @@ package org.apache.camel.component.netty;
 
 import java.io.File;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.util.jsse.SSLContextParameters;
+import org.jboss.netty.channel.socket.nio.BossPool;
+import org.jboss.netty.channel.socket.nio.WorkerPool;
 import org.jboss.netty.handler.ssl.SslHandler;
 
 public class NettyServerBootstrapConfiguration implements Cloneable {
@@ -31,6 +34,7 @@ public class NettyServerBootstrapConfiguration implements Cloneable {
     protected long sendBufferSize = 65536;
     protected long receiveBufferSize = 65536;
     protected int receiveBufferSizePredictor;
+    protected int bossCount = 1;
     protected int workerCount;
     protected boolean keepAlive = true;
     protected boolean tcpNoDelay = true;
@@ -52,6 +56,8 @@ public class NettyServerBootstrapConfiguration implements Cloneable {
     protected String keyStoreFormat;
     protected String securityProvider;
     protected String passphrase;
+    protected BossPool bossPool;
+    protected WorkerPool workerPool;
 
     public String getAddress() {
         return host + ":" + port;
@@ -125,6 +131,14 @@ public class NettyServerBootstrapConfiguration implements Cloneable {
         this.workerCount = workerCount;
     }
 
+    public int getBossCount() {
+        return bossCount;
+    }
+
+    public void setBossCount(int bossCount) {
+        this.bossCount = bossCount;
+    }
+
     public boolean isKeepAlive() {
         return keepAlive;
     }
@@ -281,6 +295,22 @@ public class NettyServerBootstrapConfiguration implements Cloneable {
         this.options = options;
     }
 
+    public BossPool getBossPool() {
+        return bossPool;
+    }
+
+    public void setBossPool(BossPool bossPool) {
+        this.bossPool = bossPool;
+    }
+
+    public WorkerPool getWorkerPool() {
+        return workerPool;
+    }
+
+    public void setWorkerPool(WorkerPool workerPool) {
+        this.workerPool = workerPool;
+    }
+
     /**
      * Checks if the other {@link NettyServerBootstrapConfiguration} is compatible
      * with this, as a Netty listener bound on port X shares the same common
@@ -305,6 +335,8 @@ public class NettyServerBootstrapConfiguration implements Cloneable {
             isCompatible = false;
         } else if (workerCount != other.workerCount) {
             isCompatible = false;
+        } else if (bossCount != other.bossCount) {
+            isCompatible = false;
         } else if (keepAlive != other.keepAlive) {
             isCompatible = false;
         } else if (tcpNoDelay != other.tcpNoDelay) {
@@ -352,6 +384,10 @@ public class NettyServerBootstrapConfiguration implements Cloneable {
             isCompatible = false;
         } else if (passphrase != null && !passphrase.equals(other.passphrase)) {
             isCompatible = false;
+        } else if (bossPool != other.bossPool) {
+            isCompatible = false;
+        } else if (workerPool != other.workerPool) {
+            isCompatible = false;
         }
 
         return isCompatible;
@@ -367,6 +403,7 @@ public class NettyServerBootstrapConfiguration implements Cloneable {
                 + ", receiveBufferSize=" + receiveBufferSize
                 + ", receiveBufferSizePredictor=" + receiveBufferSizePredictor
                 + ", workerCount=" + workerCount
+                + ", bossCount=" + bossCount
                 + ", keepAlive=" + keepAlive
                 + ", tcpNoDelay=" + tcpNoDelay
                 + ", reuseAddress=" + reuseAddress
@@ -386,6 +423,8 @@ public class NettyServerBootstrapConfiguration implements Cloneable {
                 + ", keyStoreFormat='" + keyStoreFormat + '\''
                 + ", securityProvider='" + securityProvider + '\''
                 + ", passphrase='" + passphrase + '\''
+                + ", bossPool=" + bossPool
+                + ", workerPool=" + workerPool
                 + '}';
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/4664d64c/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyWorkerPoolBuilder.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyWorkerPoolBuilder.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyWorkerPoolBuilder.java
new file mode 100644
index 0000000..2f175d5
--- /dev/null
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyWorkerPoolBuilder.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import java.util.concurrent.Executors;
+
+import org.jboss.netty.channel.socket.nio.NioWorkerPool;
+import org.jboss.netty.channel.socket.nio.WorkerPool;
+
+/**
+ * A builder to create Netty {@link WorkerPool} which can be used for sharing worker pools
+ * with multiple Netty {@link NettyServerBootstrapFactory} server bootstrap configurations.
+ */
+public final class NettyWorkerPoolBuilder {
+
+    private String name = "NettyWorker";
+    private String pattern;
+    private int workerCount;
+    private volatile WorkerPool workerPool;
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public void setPattern(String pattern) {
+        this.pattern = pattern;
+    }
+
+    public void setWorkerCount(int workerCount) {
+        this.workerCount = workerCount;
+    }
+
+    public NettyWorkerPoolBuilder withName(String name) {
+        setName(name);
+        return this;
+    }
+
+    public NettyWorkerPoolBuilder withPattern(String pattern) {
+        setPattern(pattern);
+        return this;
+    }
+
+    public NettyWorkerPoolBuilder withWorkerCount(int workerCount) {
+        setWorkerCount(workerCount);
+        return this;
+    }
+
+    /**
+     * Creates a new worker pool.
+     */
+    WorkerPool build() {
+        int count = workerCount > 0 ? workerCount : NettyHelper.DEFAULT_IO_THREADS;
+        workerPool = new NioWorkerPool(Executors.newCachedThreadPool(), count, new CamelNettyThreadNameDeterminer(pattern, name));
+        return workerPool;
+    }
+
+    /**
+     * Shutdown the created worker pool
+     */
+    public void destroy() {
+        if (workerPool != null) {
+            workerPool.shutdown();
+            workerPool = null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/4664d64c/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java
index 23e90b0..3533116 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java
@@ -18,8 +18,6 @@ package org.apache.camel.component.netty;
 
 import java.net.InetSocketAddress;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 
 import org.apache.camel.CamelContext;
@@ -31,7 +29,9 @@ import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.group.ChannelGroup;
 import org.jboss.netty.channel.group.ChannelGroupFuture;
 import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.BossPool;
 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.WorkerPool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,8 +49,8 @@ public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport impleme
     private ChannelFactory channelFactory;
     private ServerBootstrap serverBootstrap;
     private Channel channel;
-    private ExecutorService bossExecutor;
-    private ExecutorService workerExecutor;
+    private BossPool bossPool;
+    private WorkerPool workerPool;
 
     public SingleTCPNettyServerBootstrapFactory() {
         this.allChannels = new DefaultChannelGroup(SingleTCPNettyServerBootstrapFactory.class.getName());
@@ -98,21 +98,29 @@ public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport impleme
     }
 
     protected void startServerBootstrap() {
-        if (camelContext != null) {
-            bossExecutor = camelContext.getExecutorServiceManager().newCachedThreadPool(this, "NettyTCPBoss");
-            workerExecutor = camelContext.getExecutorServiceManager().newCachedThreadPool(this, "NettyTCPWorker");
-        } else {
-            bossExecutor = Executors.newCachedThreadPool(threadFactory);
-            workerExecutor = Executors.newCachedThreadPool(threadFactory);
+        // prefer using explicit configured thread pools
+        BossPool bp = configuration.getBossPool();
+        WorkerPool wp = configuration.getWorkerPool();
+
+        if (bp == null) {
+            // create new pool which we should shutdown when stopping as its not shared
+            bossPool = new NettyBossPoolBuilder()
+                    .withBossCount(configuration.getBossCount())
+                    .withName("NettyTCPBoss")
+                    .build();
+            bp = bossPool;
         }
-
-        if (configuration.getWorkerCount() <= 0) {
-            channelFactory = new NioServerSocketChannelFactory(bossExecutor, workerExecutor);
-        } else {
-            channelFactory = new NioServerSocketChannelFactory(bossExecutor, workerExecutor,
-                    configuration.getWorkerCount());
+        if (wp == null) {
+            // create new pool which we should shutdown when stopping as its not shared
+            workerPool = new NettyWorkerPoolBuilder()
+                    .withWorkerCount(configuration.getWorkerCount())
+                    .withName("NettyTCPWorker")
+                    .build();
+            wp = workerPool;
         }
 
+        channelFactory = new NioServerSocketChannelFactory(bp, wp);
+
         serverBootstrap = new ServerBootstrap(channelFactory);
         serverBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
         serverBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
@@ -156,21 +164,14 @@ public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport impleme
         }
 
         // and then shutdown the thread pools
-        if (bossExecutor != null) {
-            if (camelContext != null) {
-                camelContext.getExecutorServiceManager().shutdown(bossExecutor);
-            } else {
-                bossExecutor.shutdownNow();
-            }
-            bossExecutor = null;
+        if (bossPool != null) {
+            bossPool.shutdown();
+            bossPool = null;
         }
-        if (workerExecutor != null) {
-            if (camelContext != null) {
-                camelContext.getExecutorServiceManager().shutdown(workerExecutor);
-            } else {
-                workerExecutor.shutdownNow();
-            }
-            workerExecutor = null;
+        if (workerPool != null) {
+            workerPool.shutdown();
+            workerPool = null;
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/4664d64c/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java
index 53bc792..ae9aaef 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java
@@ -18,7 +18,6 @@ package org.apache.camel.component.netty;
 
 import java.net.InetSocketAddress;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 
@@ -33,6 +32,8 @@ import org.jboss.netty.channel.group.ChannelGroupFuture;
 import org.jboss.netty.channel.group.DefaultChannelGroup;
 import org.jboss.netty.channel.socket.DatagramChannelFactory;
 import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioDatagramWorkerPool;
+import org.jboss.netty.channel.socket.nio.WorkerPool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,6 +43,7 @@ import org.slf4j.LoggerFactory;
 public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport implements NettyServerBootstrapFactory {
 
     protected static final Logger LOG = LoggerFactory.getLogger(SingleUDPNettyServerBootstrapFactory.class);
+    private static final int DEFAULT_IO_THREADS = Runtime.getRuntime().availableProcessors() * 2;
     private final ChannelGroup allChannels;
     private CamelContext camelContext;
     private ThreadFactory threadFactory;
@@ -50,7 +52,7 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme
     private DatagramChannelFactory datagramChannelFactory;
     private ConnectionlessBootstrap connectionlessBootstrap;
     private Channel channel;
-    private ExecutorService workerExecutor;
+    private WorkerPool workerPool;
 
     public SingleUDPNettyServerBootstrapFactory() {
         this.allChannels = new DefaultChannelGroup(SingleUDPNettyServerBootstrapFactory.class.getName());
@@ -98,17 +100,11 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme
     }
 
     protected void startServerBootstrap() {
-        if (camelContext != null) {
-            workerExecutor = camelContext.getExecutorServiceManager().newCachedThreadPool(this, "NettyUDPWorker");
-        } else {
-            workerExecutor = Executors.newCachedThreadPool(threadFactory);
-        }
+        // create non-shared worker pool
+        int count = configuration.getWorkerCount() > 0 ? configuration.getWorkerCount() : DEFAULT_IO_THREADS;
+        workerPool = new NioDatagramWorkerPool(Executors.newCachedThreadPool(), count);
 
-        if (configuration.getWorkerCount() <= 0) {
-            datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor);
-        } else {
-            datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor, configuration.getWorkerCount());
-        }
+        datagramChannelFactory = new NioDatagramChannelFactory(workerPool);
 
         connectionlessBootstrap = new ConnectionlessBootstrap(datagramChannelFactory);
         connectionlessBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
@@ -161,13 +157,10 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme
         }
 
         // and then shutdown the thread pools
-        if (workerExecutor != null) {
-            if (camelContext != null) {
-                camelContext.getExecutorServiceManager().shutdown(workerExecutor);
-            } else {
-                workerExecutor.shutdownNow();
-            }
-            workerExecutor = null;
+        if (workerPool != null) {
+            workerPool.shutdown();
+            workerPool = null;
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/4664d64c/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolManyRoutesTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolManyRoutesTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolManyRoutesTest.java
new file mode 100644
index 0000000..14e1c78
--- /dev/null
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolManyRoutesTest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.JndiRegistry;
+import org.jboss.netty.channel.socket.nio.BossPool;
+import org.jboss.netty.channel.socket.nio.WorkerPool;
+import org.junit.Test;
+
+/**
+ * @version 
+ */
+public class NettyUseSharedWorkerThreadPoolManyRoutesTest extends BaseNettyTest {
+
+    private JndiRegistry jndi;
+    private BossPool sharedBoos;
+    private WorkerPool sharedWorker;
+    private int before;
+
+    @Override
+    protected boolean useJmx() {
+        return true;
+    }
+
+    @Override
+    public void setUp() throws Exception {
+        before = Thread.activeCount();
+        super.setUp();
+    }
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        jndi = super.createRegistry();
+        return jndi;
+    }
+
+    @Test
+    public void testSharedThreadPool() throws Exception {
+        int delta = Thread.activeCount() - before;
+
+        log.info("Created threads {}", delta);
+        assertTrue("There should not be created so many threads: " + delta, delta < 50);
+
+        sharedWorker.shutdown();
+        sharedBoos.shutdown();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                sharedWorker = new NettyWorkerPoolBuilder().withWorkerCount(10).build();
+                jndi.bind("sharedWorker", sharedWorker);
+                sharedBoos = new NettyBossPoolBuilder().withBossCount(20).build();
+                jndi.bind("sharedBoss", sharedBoos);
+
+                for (int i = 0; i < 100; i++) {
+                    from("netty:tcp://localhost:" + getNextPort() + "?textline=true&sync=true&orderedThreadPoolExecutor=false"
+                            + "&bossPool=#sharedBoss&workerPool=#sharedWorker")
+                        .validate(body().isInstanceOf(String.class))
+                        .to("log:result")
+                        .to("mock:result")
+                        .transform(body().regexReplaceAll("Hello", "Bye"));
+                }
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/4664d64c/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolTest.java
new file mode 100644
index 0000000..f32de27
--- /dev/null
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolTest.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.JndiRegistry;
+import org.jboss.netty.channel.socket.nio.WorkerPool;
+import org.junit.Test;
+
+/**
+ * @version 
+ */
+public class NettyUseSharedWorkerThreadPoolTest extends BaseNettyTest {
+
+    private JndiRegistry jndi;
+    private WorkerPool shared;
+    private int port;
+    private int port2;
+    private int port3;
+
+    @Override
+    protected boolean useJmx() {
+        return true;
+    }
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        jndi = super.createRegistry();
+        return jndi;
+    }
+
+    @Test
+    public void testSharedThreadPool() throws Exception {
+        getMockEndpoint("mock:result").expectedMessageCount(30);
+
+        for (int i = 0; i < 10; i++) {
+            String reply = template.requestBody("netty:tcp://localhost:" + port + "?textline=true&sync=true", "Hello World", String.class);
+            assertEquals("Bye World", reply);
+
+            reply = template.requestBody("netty:tcp://localhost:" + port2 + "?textline=true&sync=true", "Hello Camel", String.class);
+            assertEquals("Hi Camel", reply);
+
+            reply = template.requestBody("netty:tcp://localhost:" + port3 + "?textline=true&sync=true", "Hello Claus", String.class);
+            assertEquals("Hej Claus", reply);
+        }
+
+        assertMockEndpointsSatisfied();
+
+        shared.shutdown();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // we have 3 routes, but lets try to have only 2 threads in the pool
+                shared = new NettyWorkerPoolBuilder().withWorkerCount(2).build();
+                jndi.bind("sharedPool", shared);
+
+                port = getPort();
+                port2 = getNextPort();
+                port3 = getNextPort();
+
+                from("netty:tcp://localhost:" + port + "?textline=true&sync=true&workerPool=#sharedPool&orderedThreadPoolExecutor=false")
+                    .validate(body().isInstanceOf(String.class))
+                    .to("log:result")
+                    .to("mock:result")
+                    .transform(body().regexReplaceAll("Hello", "Bye"));
+
+                from("netty:tcp://localhost:" + port2 + "?textline=true&sync=true&workerPool=#sharedPool&orderedThreadPoolExecutor=false")
+                    .validate(body().isInstanceOf(String.class))
+                    .to("log:result")
+                    .to("mock:result")
+                    .transform(body().regexReplaceAll("Hello", "Hi"));
+
+                from("netty:tcp://localhost:" + port3 + "?textline=true&sync=true&workerPool=#sharedPool&orderedThreadPoolExecutor=false")
+                    .validate(body().isInstanceOf(String.class))
+                    .to("log:result")
+                    .to("mock:result")
+                    .transform(body().regexReplaceAll("Hello", "Hej"));
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/4664d64c/components/camel-netty/src/test/java/org/apache/camel/component/netty/SpringNettyUseSharedWorkerThreadPoolTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/SpringNettyUseSharedWorkerThreadPoolTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/SpringNettyUseSharedWorkerThreadPoolTest.java
new file mode 100644
index 0000000..0cc1bd6
--- /dev/null
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/SpringNettyUseSharedWorkerThreadPoolTest.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.apache.camel.test.spring.CamelSpringTestSupport;
+import org.junit.Test;
+import org.springframework.context.support.AbstractApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+/**
+ * @version 
+ */
+public class SpringNettyUseSharedWorkerThreadPoolTest extends CamelSpringTestSupport {
+
+    @Test
+    public void testSharedThreadPool() throws Exception {
+        getMockEndpoint("mock:result").expectedMessageCount(30);
+
+        for (int i = 0; i < 10; i++) {
+            String reply = template.requestBody("netty:tcp://localhost:5021?textline=true&sync=true", "Hello World", String.class);
+            assertEquals("Hello World", reply);
+
+            reply = template.requestBody("netty:tcp://localhost:5022?textline=true&sync=true", "Hello Camel", String.class);
+            assertEquals("Hello Camel", reply);
+
+            reply = template.requestBody("netty:tcp://localhost:5023?textline=true&sync=true", "Hello Claus", String.class);
+            assertEquals("Hello Claus", reply);
+        }
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected AbstractApplicationContext createApplicationContext() {
+        return new ClassPathXmlApplicationContext("org/apache/camel/component/netty/SpringNettyUseSharedWorkerThreadPoolTest.xml");
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/4664d64c/components/camel-netty/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/test/resources/log4j.properties b/components/camel-netty/src/test/resources/log4j.properties
index 1fa01e4..35d1b54 100644
--- a/components/camel-netty/src/test/resources/log4j.properties
+++ b/components/camel-netty/src/test/resources/log4j.properties
@@ -29,7 +29,7 @@ log4j.rootLogger=INFO, file
 log4j.appender.out=org.apache.log4j.ConsoleAppender
 log4j.appender.out.layout=org.apache.log4j.PatternLayout
 #log4j.appender.out.layout.ConversionPattern=[%30.30t] %-30.30c{1} %-5p %m%n
-log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+log4j.appender.out.layout.ConversionPattern=%d [%-35.35t] %-5p %-30.30c{1} - %m%n
 
 # File appender
 log4j.appender.file=org.apache.log4j.FileAppender

http://git-wip-us.apache.org/repos/asf/camel/blob/4664d64c/components/camel-netty/src/test/resources/org/apache/camel/component/netty/SpringNettyUseSharedWorkerThreadPoolTest.xml
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/test/resources/org/apache/camel/component/netty/SpringNettyUseSharedWorkerThreadPoolTest.xml b/components/camel-netty/src/test/resources/org/apache/camel/component/netty/SpringNettyUseSharedWorkerThreadPoolTest.xml
new file mode 100644
index 0000000..7ea4d2d
--- /dev/null
+++ b/components/camel-netty/src/test/resources/org/apache/camel/component/netty/SpringNettyUseSharedWorkerThreadPoolTest.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
+
+  <!-- use the worker pool builder to create to help create the shared thread pool -->
+  <bean id="poolBuilder" class="org.apache.camel.component.netty.NettyWorkerPoolBuilder">
+    <property name="workerCount" value="2"/>
+  </bean>
+
+  <!-- the shared worker thread pool -->
+  <bean id="sharedPool" class="org.jboss.netty.channel.socket.nio.WorkerPool"
+        factory-bean="poolBuilder" factory-method="build" destroy-method="shutdown">
+  </bean>
+
+  <camelContext xmlns="http://camel.apache.org/schema/spring">
+    <route>
+      <from uri="netty:tcp://localhost:5021?textline=true&amp;sync=true&amp;workerPool=#sharedPool&amp;orderedThreadPoolExecutor=false"/>
+      <to uri="log:result"/>
+      <to uri="mock:result"/>
+    </route>
+
+    <route>
+      <from uri="netty:tcp://localhost:5022?textline=true&amp;sync=true&amp;workerPool=#sharedPool&amp;orderedThreadPoolExecutor=false"/>
+      <to uri="log:result"/>
+      <to uri="mock:result"/>
+    </route>
+
+    <route>
+      <from uri="netty:tcp://localhost:5023?textline=true&amp;sync=true&amp;workerPool=#sharedPool&amp;orderedThreadPoolExecutor=false"/>
+      <to uri="log:result"/>
+      <to uri="mock:result"/>
+    </route>
+  </camelContext>
+
+</beans>