You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/07/09 15:28:00 UTC
git commit: CAMEL-6522: Added bossPool and workerPool options to
Netty consumer to allow sharing thread pools more easily.
Updated Branches:
refs/heads/master 371efe7e0 -> 4664d64c6
CAMEL-6522: Added bossPool and workerPool options to Netty consumer to allow sharing thread pools more easily.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4664d64c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4664d64c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4664d64c
Branch: refs/heads/master
Commit: 4664d64c69840637b564b5f65c5ec0f026e46c10
Parents: 371efe7
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Jul 9 15:07:12 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Jul 9 15:27:42 2013 +0200
----------------------------------------------------------------------
.../netty/CamelNettyThreadNameDeterminer.java | 36 +++++++
.../component/netty/NettyBossPoolBuilder.java | 67 +++++++++++++
.../camel/component/netty/NettyHelper.java | 1 +
.../NettyServerBootstrapConfiguration.java | 39 ++++++++
.../component/netty/NettyWorkerPoolBuilder.java | 80 ++++++++++++++++
.../SingleTCPNettyServerBootstrapFactory.java | 61 ++++++------
.../SingleUDPNettyServerBootstrapFactory.java | 31 +++---
...UseSharedWorkerThreadPoolManyRoutesTest.java | 84 +++++++++++++++++
.../NettyUseSharedWorkerThreadPoolTest.java | 99 ++++++++++++++++++++
...pringNettyUseSharedWorkerThreadPoolTest.java | 51 ++++++++++
.../src/test/resources/log4j.properties | 2 +-
...SpringNettyUseSharedWorkerThreadPoolTest.xml | 52 ++++++++++
12 files changed, 553 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/4664d64c/components/camel-netty/src/main/java/org/apache/camel/component/netty/CamelNettyThreadNameDeterminer.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/CamelNettyThreadNameDeterminer.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/CamelNettyThreadNameDeterminer.java
new file mode 100644
index 0000000..c18ce69
--- /dev/null
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/CamelNettyThreadNameDeterminer.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.apache.camel.util.concurrent.ThreadHelper;
+import org.jboss.netty.util.ThreadNameDeterminer;
+
+public class CamelNettyThreadNameDeterminer implements ThreadNameDeterminer {
+
+ private final String pattern;
+ private final String name;
+
+ public CamelNettyThreadNameDeterminer(String pattern, String name) {
+ this.pattern = pattern;
+ this.name = name;
+ }
+
+ @Override
+ public String determineThreadName(String currentThreadName, String proposedThreadName) throws Exception {
+ return ThreadHelper.resolveThreadName(pattern, name);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/4664d64c/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyBossPoolBuilder.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyBossPoolBuilder.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyBossPoolBuilder.java
new file mode 100644
index 0000000..13c0876
--- /dev/null
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyBossPoolBuilder.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import java.util.concurrent.Executors;
+
+import org.jboss.netty.channel.socket.nio.BossPool;
+import org.jboss.netty.channel.socket.nio.NioServerBossPool;
+
+/**
+ * A builder to create Netty {@link org.jboss.netty.channel.socket.nio.BossPool} which can be used for sharing boos pools
+ * with multiple Netty {@link org.apache.camel.component.netty.NettyServerBootstrapFactory} server bootstrap configurations.
+ */
+public final class NettyBossPoolBuilder {
+
+ private String name = "NettyBoss";
+ private String pattern;
+ private int bossCount = 1;
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public void setPattern(String pattern) {
+ this.pattern = pattern;
+ }
+
+ public void setBossCount(int bossCount) {
+ this.bossCount = bossCount;
+ }
+
+ public NettyBossPoolBuilder withName(String name) {
+ setName(name);
+ return this;
+ }
+
+ public NettyBossPoolBuilder withPattern(String pattern) {
+ setPattern(pattern);
+ return this;
+ }
+
+ public NettyBossPoolBuilder withBossCount(int bossCount) {
+ setBossCount(bossCount);
+ return this;
+ }
+
+ /**
+ * Creates a new boss pool.
+ */
+ BossPool build() {
+ return new NioServerBossPool(Executors.newCachedThreadPool(), bossCount, new CamelNettyThreadNameDeterminer(pattern, name));
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/4664d64c/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java
index 36d325a..05f3e4d 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
*/
public final class NettyHelper {
+ public static final int DEFAULT_IO_THREADS = Runtime.getRuntime().availableProcessors() * 2;
private static final Logger LOG = LoggerFactory.getLogger(NettyHelper.class);
private NettyHelper() {
http://git-wip-us.apache.org/repos/asf/camel/blob/4664d64c/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapConfiguration.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapConfiguration.java
index 6ea9d9e..7df49b0 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapConfiguration.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapConfiguration.java
@@ -18,8 +18,11 @@ package org.apache.camel.component.netty;
import java.io.File;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
import org.apache.camel.util.jsse.SSLContextParameters;
+import org.jboss.netty.channel.socket.nio.BossPool;
+import org.jboss.netty.channel.socket.nio.WorkerPool;
import org.jboss.netty.handler.ssl.SslHandler;
public class NettyServerBootstrapConfiguration implements Cloneable {
@@ -31,6 +34,7 @@ public class NettyServerBootstrapConfiguration implements Cloneable {
protected long sendBufferSize = 65536;
protected long receiveBufferSize = 65536;
protected int receiveBufferSizePredictor;
+ protected int bossCount = 1;
protected int workerCount;
protected boolean keepAlive = true;
protected boolean tcpNoDelay = true;
@@ -52,6 +56,8 @@ public class NettyServerBootstrapConfiguration implements Cloneable {
protected String keyStoreFormat;
protected String securityProvider;
protected String passphrase;
+ protected BossPool bossPool;
+ protected WorkerPool workerPool;
public String getAddress() {
return host + ":" + port;
@@ -125,6 +131,14 @@ public class NettyServerBootstrapConfiguration implements Cloneable {
this.workerCount = workerCount;
}
+ public int getBossCount() {
+ return bossCount;
+ }
+
+ public void setBossCount(int bossCount) {
+ this.bossCount = bossCount;
+ }
+
public boolean isKeepAlive() {
return keepAlive;
}
@@ -281,6 +295,22 @@ public class NettyServerBootstrapConfiguration implements Cloneable {
this.options = options;
}
+ public BossPool getBossPool() {
+ return bossPool;
+ }
+
+ public void setBossPool(BossPool bossPool) {
+ this.bossPool = bossPool;
+ }
+
+ public WorkerPool getWorkerPool() {
+ return workerPool;
+ }
+
+ public void setWorkerPool(WorkerPool workerPool) {
+ this.workerPool = workerPool;
+ }
+
/**
* Checks if the other {@link NettyServerBootstrapConfiguration} is compatible
* with this, as a Netty listener bound on port X shares the same common
@@ -305,6 +335,8 @@ public class NettyServerBootstrapConfiguration implements Cloneable {
isCompatible = false;
} else if (workerCount != other.workerCount) {
isCompatible = false;
+ } else if (bossCount != other.bossCount) {
+ isCompatible = false;
} else if (keepAlive != other.keepAlive) {
isCompatible = false;
} else if (tcpNoDelay != other.tcpNoDelay) {
@@ -352,6 +384,10 @@ public class NettyServerBootstrapConfiguration implements Cloneable {
isCompatible = false;
} else if (passphrase != null && !passphrase.equals(other.passphrase)) {
isCompatible = false;
+ } else if (bossPool != other.bossPool) {
+ isCompatible = false;
+ } else if (workerPool != other.workerPool) {
+ isCompatible = false;
}
return isCompatible;
@@ -367,6 +403,7 @@ public class NettyServerBootstrapConfiguration implements Cloneable {
+ ", receiveBufferSize=" + receiveBufferSize
+ ", receiveBufferSizePredictor=" + receiveBufferSizePredictor
+ ", workerCount=" + workerCount
+ + ", bossCount=" + bossCount
+ ", keepAlive=" + keepAlive
+ ", tcpNoDelay=" + tcpNoDelay
+ ", reuseAddress=" + reuseAddress
@@ -386,6 +423,8 @@ public class NettyServerBootstrapConfiguration implements Cloneable {
+ ", keyStoreFormat='" + keyStoreFormat + '\''
+ ", securityProvider='" + securityProvider + '\''
+ ", passphrase='" + passphrase + '\''
+ + ", bossPool=" + bossPool
+ + ", workerPool=" + workerPool
+ '}';
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/4664d64c/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyWorkerPoolBuilder.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyWorkerPoolBuilder.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyWorkerPoolBuilder.java
new file mode 100644
index 0000000..2f175d5
--- /dev/null
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyWorkerPoolBuilder.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import java.util.concurrent.Executors;
+
+import org.jboss.netty.channel.socket.nio.NioWorkerPool;
+import org.jboss.netty.channel.socket.nio.WorkerPool;
+
+/**
+ * A builder to create Netty {@link WorkerPool} which can be used for sharing worker pools
+ * with multiple Netty {@link NettyServerBootstrapFactory} server bootstrap configurations.
+ */
+public final class NettyWorkerPoolBuilder {
+
+ private String name = "NettyWorker";
+ private String pattern;
+ private int workerCount;
+ private volatile WorkerPool workerPool;
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public void setPattern(String pattern) {
+ this.pattern = pattern;
+ }
+
+ public void setWorkerCount(int workerCount) {
+ this.workerCount = workerCount;
+ }
+
+ public NettyWorkerPoolBuilder withName(String name) {
+ setName(name);
+ return this;
+ }
+
+ public NettyWorkerPoolBuilder withPattern(String pattern) {
+ setPattern(pattern);
+ return this;
+ }
+
+ public NettyWorkerPoolBuilder withWorkerCount(int workerCount) {
+ setWorkerCount(workerCount);
+ return this;
+ }
+
+ /**
+ * Creates a new worker pool.
+ */
+ WorkerPool build() {
+ int count = workerCount > 0 ? workerCount : NettyHelper.DEFAULT_IO_THREADS;
+ workerPool = new NioWorkerPool(Executors.newCachedThreadPool(), count, new CamelNettyThreadNameDeterminer(pattern, name));
+ return workerPool;
+ }
+
+ /**
+ * Shutdown the created worker pool
+ */
+ public void destroy() {
+ if (workerPool != null) {
+ workerPool.shutdown();
+ workerPool = null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/4664d64c/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java
index 23e90b0..3533116 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java
@@ -18,8 +18,6 @@ package org.apache.camel.component.netty;
import java.net.InetSocketAddress;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.apache.camel.CamelContext;
@@ -31,7 +29,9 @@ import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.ChannelGroupFuture;
import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.BossPool;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.WorkerPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,8 +49,8 @@ public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport impleme
private ChannelFactory channelFactory;
private ServerBootstrap serverBootstrap;
private Channel channel;
- private ExecutorService bossExecutor;
- private ExecutorService workerExecutor;
+ private BossPool bossPool;
+ private WorkerPool workerPool;
public SingleTCPNettyServerBootstrapFactory() {
this.allChannels = new DefaultChannelGroup(SingleTCPNettyServerBootstrapFactory.class.getName());
@@ -98,21 +98,29 @@ public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport impleme
}
protected void startServerBootstrap() {
- if (camelContext != null) {
- bossExecutor = camelContext.getExecutorServiceManager().newCachedThreadPool(this, "NettyTCPBoss");
- workerExecutor = camelContext.getExecutorServiceManager().newCachedThreadPool(this, "NettyTCPWorker");
- } else {
- bossExecutor = Executors.newCachedThreadPool(threadFactory);
- workerExecutor = Executors.newCachedThreadPool(threadFactory);
+ // prefer using explicit configured thread pools
+ BossPool bp = configuration.getBossPool();
+ WorkerPool wp = configuration.getWorkerPool();
+
+ if (bp == null) {
+ // create new pool which we should shutdown when stopping as its not shared
+ bossPool = new NettyBossPoolBuilder()
+ .withBossCount(configuration.getBossCount())
+ .withName("NettyTCPBoss")
+ .build();
+ bp = bossPool;
}
-
- if (configuration.getWorkerCount() <= 0) {
- channelFactory = new NioServerSocketChannelFactory(bossExecutor, workerExecutor);
- } else {
- channelFactory = new NioServerSocketChannelFactory(bossExecutor, workerExecutor,
- configuration.getWorkerCount());
+ if (wp == null) {
+ // create new pool which we should shutdown when stopping as its not shared
+ workerPool = new NettyWorkerPoolBuilder()
+ .withWorkerCount(configuration.getWorkerCount())
+ .withName("NettyTCPWorker")
+ .build();
+ wp = workerPool;
}
+ channelFactory = new NioServerSocketChannelFactory(bp, wp);
+
serverBootstrap = new ServerBootstrap(channelFactory);
serverBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
serverBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
@@ -156,21 +164,14 @@ public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport impleme
}
// and then shutdown the thread pools
- if (bossExecutor != null) {
- if (camelContext != null) {
- camelContext.getExecutorServiceManager().shutdown(bossExecutor);
- } else {
- bossExecutor.shutdownNow();
- }
- bossExecutor = null;
+ if (bossPool != null) {
+ bossPool.shutdown();
+ bossPool = null;
}
- if (workerExecutor != null) {
- if (camelContext != null) {
- camelContext.getExecutorServiceManager().shutdown(workerExecutor);
- } else {
- workerExecutor.shutdownNow();
- }
- workerExecutor = null;
+ if (workerPool != null) {
+ workerPool.shutdown();
+ workerPool = null;
}
}
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/4664d64c/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java
index 53bc792..ae9aaef 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java
@@ -18,7 +18,6 @@ package org.apache.camel.component.netty;
import java.net.InetSocketAddress;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
@@ -33,6 +32,8 @@ import org.jboss.netty.channel.group.ChannelGroupFuture;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.DatagramChannelFactory;
import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioDatagramWorkerPool;
+import org.jboss.netty.channel.socket.nio.WorkerPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,6 +43,7 @@ import org.slf4j.LoggerFactory;
public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport implements NettyServerBootstrapFactory {
protected static final Logger LOG = LoggerFactory.getLogger(SingleUDPNettyServerBootstrapFactory.class);
+ private static final int DEFAULT_IO_THREADS = Runtime.getRuntime().availableProcessors() * 2;
private final ChannelGroup allChannels;
private CamelContext camelContext;
private ThreadFactory threadFactory;
@@ -50,7 +52,7 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme
private DatagramChannelFactory datagramChannelFactory;
private ConnectionlessBootstrap connectionlessBootstrap;
private Channel channel;
- private ExecutorService workerExecutor;
+ private WorkerPool workerPool;
public SingleUDPNettyServerBootstrapFactory() {
this.allChannels = new DefaultChannelGroup(SingleUDPNettyServerBootstrapFactory.class.getName());
@@ -98,17 +100,11 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme
}
protected void startServerBootstrap() {
- if (camelContext != null) {
- workerExecutor = camelContext.getExecutorServiceManager().newCachedThreadPool(this, "NettyUDPWorker");
- } else {
- workerExecutor = Executors.newCachedThreadPool(threadFactory);
- }
+ // create non-shared worker pool
+ int count = configuration.getWorkerCount() > 0 ? configuration.getWorkerCount() : DEFAULT_IO_THREADS;
+ workerPool = new NioDatagramWorkerPool(Executors.newCachedThreadPool(), count);
- if (configuration.getWorkerCount() <= 0) {
- datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor);
- } else {
- datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor, configuration.getWorkerCount());
- }
+ datagramChannelFactory = new NioDatagramChannelFactory(workerPool);
connectionlessBootstrap = new ConnectionlessBootstrap(datagramChannelFactory);
connectionlessBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
@@ -161,13 +157,10 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme
}
// and then shutdown the thread pools
- if (workerExecutor != null) {
- if (camelContext != null) {
- camelContext.getExecutorServiceManager().shutdown(workerExecutor);
- } else {
- workerExecutor.shutdownNow();
- }
- workerExecutor = null;
+ if (workerPool != null) {
+ workerPool.shutdown();
+ workerPool = null;
}
}
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/4664d64c/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolManyRoutesTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolManyRoutesTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolManyRoutesTest.java
new file mode 100644
index 0000000..14e1c78
--- /dev/null
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolManyRoutesTest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.JndiRegistry;
+import org.jboss.netty.channel.socket.nio.BossPool;
+import org.jboss.netty.channel.socket.nio.WorkerPool;
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class NettyUseSharedWorkerThreadPoolManyRoutesTest extends BaseNettyTest {
+
+ private JndiRegistry jndi;
+ private BossPool sharedBoos;
+ private WorkerPool sharedWorker;
+ private int before;
+
+ @Override
+ protected boolean useJmx() {
+ return true;
+ }
+
+ @Override
+ public void setUp() throws Exception {
+ before = Thread.activeCount();
+ super.setUp();
+ }
+
+ @Override
+ protected JndiRegistry createRegistry() throws Exception {
+ jndi = super.createRegistry();
+ return jndi;
+ }
+
+ @Test
+ public void testSharedThreadPool() throws Exception {
+ int delta = Thread.activeCount() - before;
+
+ log.info("Created threads {}", delta);
+ assertTrue("There should not be created so many threads: " + delta, delta < 50);
+
+ sharedWorker.shutdown();
+ sharedBoos.shutdown();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ sharedWorker = new NettyWorkerPoolBuilder().withWorkerCount(10).build();
+ jndi.bind("sharedWorker", sharedWorker);
+ sharedBoos = new NettyBossPoolBuilder().withBossCount(20).build();
+ jndi.bind("sharedBoss", sharedBoos);
+
+ for (int i = 0; i < 100; i++) {
+ from("netty:tcp://localhost:" + getNextPort() + "?textline=true&sync=true&orderedThreadPoolExecutor=false"
+ + "&bossPool=#sharedBoss&workerPool=#sharedWorker")
+ .validate(body().isInstanceOf(String.class))
+ .to("log:result")
+ .to("mock:result")
+ .transform(body().regexReplaceAll("Hello", "Bye"));
+ }
+ }
+ };
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/4664d64c/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolTest.java
new file mode 100644
index 0000000..f32de27
--- /dev/null
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolTest.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.JndiRegistry;
+import org.jboss.netty.channel.socket.nio.WorkerPool;
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class NettyUseSharedWorkerThreadPoolTest extends BaseNettyTest {
+
+ private JndiRegistry jndi;
+ private WorkerPool shared;
+ private int port;
+ private int port2;
+ private int port3;
+
+ @Override
+ protected boolean useJmx() {
+ return true;
+ }
+
+ @Override
+ protected JndiRegistry createRegistry() throws Exception {
+ jndi = super.createRegistry();
+ return jndi;
+ }
+
+ @Test
+ public void testSharedThreadPool() throws Exception {
+ getMockEndpoint("mock:result").expectedMessageCount(30);
+
+ for (int i = 0; i < 10; i++) {
+ String reply = template.requestBody("netty:tcp://localhost:" + port + "?textline=true&sync=true", "Hello World", String.class);
+ assertEquals("Bye World", reply);
+
+ reply = template.requestBody("netty:tcp://localhost:" + port2 + "?textline=true&sync=true", "Hello Camel", String.class);
+ assertEquals("Hi Camel", reply);
+
+ reply = template.requestBody("netty:tcp://localhost:" + port3 + "?textline=true&sync=true", "Hello Claus", String.class);
+ assertEquals("Hej Claus", reply);
+ }
+
+ assertMockEndpointsSatisfied();
+
+ shared.shutdown();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ // we have 3 routes, but lets try to have only 2 threads in the pool
+ shared = new NettyWorkerPoolBuilder().withWorkerCount(2).build();
+ jndi.bind("sharedPool", shared);
+
+ port = getPort();
+ port2 = getNextPort();
+ port3 = getNextPort();
+
+ from("netty:tcp://localhost:" + port + "?textline=true&sync=true&workerPool=#sharedPool&orderedThreadPoolExecutor=false")
+ .validate(body().isInstanceOf(String.class))
+ .to("log:result")
+ .to("mock:result")
+ .transform(body().regexReplaceAll("Hello", "Bye"));
+
+ from("netty:tcp://localhost:" + port2 + "?textline=true&sync=true&workerPool=#sharedPool&orderedThreadPoolExecutor=false")
+ .validate(body().isInstanceOf(String.class))
+ .to("log:result")
+ .to("mock:result")
+ .transform(body().regexReplaceAll("Hello", "Hi"));
+
+ from("netty:tcp://localhost:" + port3 + "?textline=true&sync=true&workerPool=#sharedPool&orderedThreadPoolExecutor=false")
+ .validate(body().isInstanceOf(String.class))
+ .to("log:result")
+ .to("mock:result")
+ .transform(body().regexReplaceAll("Hello", "Hej"));
+ }
+ };
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/4664d64c/components/camel-netty/src/test/java/org/apache/camel/component/netty/SpringNettyUseSharedWorkerThreadPoolTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/SpringNettyUseSharedWorkerThreadPoolTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/SpringNettyUseSharedWorkerThreadPoolTest.java
new file mode 100644
index 0000000..0cc1bd6
--- /dev/null
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/SpringNettyUseSharedWorkerThreadPoolTest.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.apache.camel.test.spring.CamelSpringTestSupport;
+import org.junit.Test;
+import org.springframework.context.support.AbstractApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+/**
+ * @version
+ */
+public class SpringNettyUseSharedWorkerThreadPoolTest extends CamelSpringTestSupport {
+
+ @Test
+ public void testSharedThreadPool() throws Exception {
+ getMockEndpoint("mock:result").expectedMessageCount(30);
+
+ for (int i = 0; i < 10; i++) {
+ String reply = template.requestBody("netty:tcp://localhost:5021?textline=true&sync=true", "Hello World", String.class);
+ assertEquals("Hello World", reply);
+
+ reply = template.requestBody("netty:tcp://localhost:5022?textline=true&sync=true", "Hello Camel", String.class);
+ assertEquals("Hello Camel", reply);
+
+ reply = template.requestBody("netty:tcp://localhost:5023?textline=true&sync=true", "Hello Claus", String.class);
+ assertEquals("Hello Claus", reply);
+ }
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected AbstractApplicationContext createApplicationContext() {
+ return new ClassPathXmlApplicationContext("org/apache/camel/component/netty/SpringNettyUseSharedWorkerThreadPoolTest.xml");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/4664d64c/components/camel-netty/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/test/resources/log4j.properties b/components/camel-netty/src/test/resources/log4j.properties
index 1fa01e4..35d1b54 100644
--- a/components/camel-netty/src/test/resources/log4j.properties
+++ b/components/camel-netty/src/test/resources/log4j.properties
@@ -29,7 +29,7 @@ log4j.rootLogger=INFO, file
log4j.appender.out=org.apache.log4j.ConsoleAppender
log4j.appender.out.layout=org.apache.log4j.PatternLayout
#log4j.appender.out.layout.ConversionPattern=[%30.30t] %-30.30c{1} %-5p %m%n
-log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+log4j.appender.out.layout.ConversionPattern=%d [%-35.35t] %-5p %-30.30c{1} - %m%n
# File appender
log4j.appender.file=org.apache.log4j.FileAppender
http://git-wip-us.apache.org/repos/asf/camel/blob/4664d64c/components/camel-netty/src/test/resources/org/apache/camel/component/netty/SpringNettyUseSharedWorkerThreadPoolTest.xml
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/test/resources/org/apache/camel/component/netty/SpringNettyUseSharedWorkerThreadPoolTest.xml b/components/camel-netty/src/test/resources/org/apache/camel/component/netty/SpringNettyUseSharedWorkerThreadPoolTest.xml
new file mode 100644
index 0000000..7ea4d2d
--- /dev/null
+++ b/components/camel-netty/src/test/resources/org/apache/camel/component/netty/SpringNettyUseSharedWorkerThreadPoolTest.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
+
+ <!-- use the worker pool builder to create to help create the shared thread pool -->
+ <bean id="poolBuilder" class="org.apache.camel.component.netty.NettyWorkerPoolBuilder">
+ <property name="workerCount" value="2"/>
+ </bean>
+
+ <!-- the shared worker thread pool -->
+ <bean id="sharedPool" class="org.jboss.netty.channel.socket.nio.WorkerPool"
+ factory-bean="poolBuilder" factory-method="build" destroy-method="shutdown">
+ </bean>
+
+ <camelContext xmlns="http://camel.apache.org/schema/spring">
+ <route>
+ <from uri="netty:tcp://localhost:5021?textline=true&sync=true&workerPool=#sharedPool&orderedThreadPoolExecutor=false"/>
+ <to uri="log:result"/>
+ <to uri="mock:result"/>
+ </route>
+
+ <route>
+ <from uri="netty:tcp://localhost:5022?textline=true&sync=true&workerPool=#sharedPool&orderedThreadPoolExecutor=false"/>
+ <to uri="log:result"/>
+ <to uri="mock:result"/>
+ </route>
+
+ <route>
+ <from uri="netty:tcp://localhost:5023?textline=true&sync=true&workerPool=#sharedPool&orderedThreadPoolExecutor=false"/>
+ <to uri="log:result"/>
+ <to uri="mock:result"/>
+ </route>
+ </camelContext>
+
+</beans>