You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/04/09 16:09:33 UTC
svn commit: r1311238 - in
/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina:
MinaComponent.java MinaEndpoint.java
Author: davsclaus
Date: Mon Apr 9 14:09:32 2012
New Revision: 1311238
URL: http://svn.apache.org/viewvc?rev=1311238&view=rev
Log:
CAMEL-4853: Prefer to use cached thread pools as Mina documentation suggest. Ensure thread pools is also shutdown.
Modified:
camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java
Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java?rev=1311238&r1=1311237&r2=1311238&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java Mon Apr 9 14:09:32 2012
@@ -22,6 +22,7 @@ import java.net.URI;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
@@ -164,10 +165,12 @@ public class MinaComponent extends Defau
List<IoFilter> filters = configuration.getFilters();
final int processorCount = Runtime.getRuntime().availableProcessors() + 1;
- IoAcceptor acceptor = new SocketAcceptor(processorCount,
- getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaSocketAcceptor"));
- IoConnector connector = new SocketConnector(processorCount,
- getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaSocketConnector"));
+ ExecutorService acceptorPool = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "MinaSocketAcceptor");
+ ExecutorService connectorPool = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "MinaSocketConnector");
+ ExecutorService workerPool = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "MinaThreadPool");
+
+ IoAcceptor acceptor = new SocketAcceptor(processorCount, acceptorPool);
+ IoConnector connector = new SocketConnector(processorCount, connectorPool);
SocketAddress address = new InetSocketAddress(configuration.getHost(), configuration.getPort());
// connector config
@@ -175,8 +178,7 @@ public class MinaComponent extends Defau
// must use manual thread model according to Mina documentation
connectorConfig.setThreadModel(ThreadModel.MANUAL);
configureCodecFactory("MinaProducer", connectorConfig, configuration);
- connectorConfig.getFilterChain().addLast("threadPool",
- new ExecutorFilter(getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaThreadPool")));
+ connectorConfig.getFilterChain().addLast("threadPool", new ExecutorFilter(workerPool));
if (minaLogger) {
connectorConfig.getFilterChain().addLast("logger", new LoggingFilter());
}
@@ -192,8 +194,7 @@ public class MinaComponent extends Defau
configureCodecFactory("MinaConsumer", acceptorConfig, configuration);
acceptorConfig.setReuseAddress(true);
acceptorConfig.setDisconnectOnUnbind(true);
- acceptorConfig.getFilterChain().addLast("threadPool",
- new ExecutorFilter(getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaThreadPool")));
+ acceptorConfig.getFilterChain().addLast("threadPool", new ExecutorFilter(workerPool));
if (minaLogger) {
acceptorConfig.getFilterChain().addLast("logger", new LoggingFilter());
}
@@ -207,6 +208,11 @@ public class MinaComponent extends Defau
endpoint.setConnectorConfig(connectorConfig);
endpoint.setConfiguration(configuration);
+ // enlist threads pools in use on endpoint
+ endpoint.addThreadPool(acceptorPool);
+ endpoint.addThreadPool(connectorPool);
+ endpoint.addThreadPool(workerPool);
+
// set sync or async mode after endpoint is created
if (sync) {
endpoint.setExchangePattern(ExchangePattern.InOut);
@@ -258,8 +264,12 @@ public class MinaComponent extends Defau
boolean sync = configuration.isSync();
List<IoFilter> filters = configuration.getFilters();
- IoAcceptor acceptor = new DatagramAcceptor(getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaDatagramAcceptor"));
- IoConnector connector = new DatagramConnector(getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaDatagramConnector"));
+ ExecutorService acceptorPool = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "MinaDatagramAcceptor");
+ ExecutorService connectorPool = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "MinaDatagramConnector");
+ ExecutorService workerPool = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "MinaThreadPool");
+
+ IoAcceptor acceptor = new DatagramAcceptor(acceptorPool);
+ IoConnector connector = new DatagramConnector(connectorPool);
SocketAddress address = new InetSocketAddress(configuration.getHost(), configuration.getPort());
if (transferExchange) {
@@ -270,8 +280,7 @@ public class MinaComponent extends Defau
// must use manual thread model according to Mina documentation
connectorConfig.setThreadModel(ThreadModel.MANUAL);
configureDataGramCodecFactory("MinaProducer", connectorConfig, configuration);
- connectorConfig.getFilterChain().addLast("threadPool",
- new ExecutorFilter(getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaThreadPool")));
+ connectorConfig.getFilterChain().addLast("threadPool", new ExecutorFilter(workerPool));
if (minaLogger) {
connectorConfig.getFilterChain().addLast("logger", new LoggingFilter());
}
@@ -285,8 +294,7 @@ public class MinaComponent extends Defau
configureDataGramCodecFactory("MinaConsumer", acceptorConfig, configuration);
acceptorConfig.setDisconnectOnUnbind(true);
// reuse address is default true for datagram
- acceptorConfig.getFilterChain().addLast("threadPool",
- new ExecutorFilter(getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaThreadPool")));
+ acceptorConfig.getFilterChain().addLast("threadPool", new ExecutorFilter(workerPool));
if (minaLogger) {
acceptorConfig.getFilterChain().addLast("logger", new LoggingFilter());
}
@@ -299,6 +307,12 @@ public class MinaComponent extends Defau
endpoint.setConnector(connector);
endpoint.setConnectorConfig(connectorConfig);
endpoint.setConfiguration(configuration);
+
+ // enlist threads pools in use on endpoint
+ endpoint.addThreadPool(acceptorPool);
+ endpoint.addThreadPool(connectorPool);
+ endpoint.addThreadPool(workerPool);
+
// set sync or async mode after endpoint is created
if (sync) {
endpoint.setExchangePattern(ExchangePattern.InOut);
Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java?rev=1311238&r1=1311237&r2=1311238&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java Mon Apr 9 14:09:32 2012
@@ -17,6 +17,9 @@
package org.apache.camel.component.mina;
import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
@@ -54,6 +57,7 @@ public class MinaEndpoint extends Defaul
private IoAcceptorConfig acceptorConfig;
private IoConnectorConfig connectorConfig;
private MinaConfiguration configuration;
+ private final List<ExecutorService> executors = new ArrayList<ExecutorService>();
public MinaEndpoint() {
}
@@ -102,6 +106,23 @@ public class MinaEndpoint extends Defaul
return configuration.isDatagramProtocol();
}
+ @Override
+ protected void doShutdown() throws Exception {
+ // shutdown thread pools
+ for (ExecutorService executor : executors) {
+ getCamelContext().getExecutorServiceManager().shutdownNow(executor);
+ }
+ executors.clear();
+ super.doShutdown();
+ }
+
+ /**
+ * Add thread pool which are in-use, we need to un-register when shutting down.
+ */
+ protected void addThreadPool(ExecutorService executorService) {
+ executors.add(executorService);
+ }
+
// Properties
// -------------------------------------------------------------------------