You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/04/09 16:09:33 UTC

svn commit: r1311238 - in /camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina: MinaComponent.java MinaEndpoint.java

Author: davsclaus
Date: Mon Apr  9 14:09:32 2012
New Revision: 1311238

URL: http://svn.apache.org/viewvc?rev=1311238&view=rev
Log:
CAMEL-4853: Prefer to use cached thread pools as Mina documentation suggest. Ensure thread pools is also shutdown.

Modified:
    camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
    camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java

Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java?rev=1311238&r1=1311237&r2=1311238&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java Mon Apr  9 14:09:32 2012
@@ -22,6 +22,7 @@ import java.net.URI;
 import java.nio.charset.Charset;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
@@ -164,10 +165,12 @@ public class MinaComponent extends Defau
         List<IoFilter> filters = configuration.getFilters();
         final int processorCount = Runtime.getRuntime().availableProcessors() + 1;
 
-        IoAcceptor acceptor = new SocketAcceptor(processorCount,
-                getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaSocketAcceptor"));
-        IoConnector connector = new SocketConnector(processorCount,
-                getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaSocketConnector"));
+        ExecutorService acceptorPool = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "MinaSocketAcceptor");
+        ExecutorService connectorPool = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "MinaSocketConnector");
+        ExecutorService workerPool = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "MinaThreadPool");
+
+        IoAcceptor acceptor = new SocketAcceptor(processorCount, acceptorPool);
+        IoConnector connector = new SocketConnector(processorCount, connectorPool);
         SocketAddress address = new InetSocketAddress(configuration.getHost(), configuration.getPort());
 
         // connector config
@@ -175,8 +178,7 @@ public class MinaComponent extends Defau
         // must use manual thread model according to Mina documentation
         connectorConfig.setThreadModel(ThreadModel.MANUAL);
         configureCodecFactory("MinaProducer", connectorConfig, configuration);
-        connectorConfig.getFilterChain().addLast("threadPool",
-                new ExecutorFilter(getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaThreadPool")));
+        connectorConfig.getFilterChain().addLast("threadPool", new ExecutorFilter(workerPool));
         if (minaLogger) {
             connectorConfig.getFilterChain().addLast("logger", new LoggingFilter());
         }
@@ -192,8 +194,7 @@ public class MinaComponent extends Defau
         configureCodecFactory("MinaConsumer", acceptorConfig, configuration);
         acceptorConfig.setReuseAddress(true);
         acceptorConfig.setDisconnectOnUnbind(true);
-        acceptorConfig.getFilterChain().addLast("threadPool",
-                new ExecutorFilter(getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaThreadPool")));
+        acceptorConfig.getFilterChain().addLast("threadPool", new ExecutorFilter(workerPool));
         if (minaLogger) {
             acceptorConfig.getFilterChain().addLast("logger", new LoggingFilter());
         }
@@ -207,6 +208,11 @@ public class MinaComponent extends Defau
         endpoint.setConnectorConfig(connectorConfig);
         endpoint.setConfiguration(configuration);
 
+        // enlist threads pools in use on endpoint
+        endpoint.addThreadPool(acceptorPool);
+        endpoint.addThreadPool(connectorPool);
+        endpoint.addThreadPool(workerPool);
+
         // set sync or async mode after endpoint is created
         if (sync) {
             endpoint.setExchangePattern(ExchangePattern.InOut);
@@ -258,8 +264,12 @@ public class MinaComponent extends Defau
         boolean sync = configuration.isSync();
         List<IoFilter> filters = configuration.getFilters();
 
-        IoAcceptor acceptor = new DatagramAcceptor(getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaDatagramAcceptor"));
-        IoConnector connector = new DatagramConnector(getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaDatagramConnector"));
+        ExecutorService acceptorPool = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "MinaDatagramAcceptor");
+        ExecutorService connectorPool = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "MinaDatagramConnector");
+        ExecutorService workerPool = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "MinaThreadPool");
+
+        IoAcceptor acceptor = new DatagramAcceptor(acceptorPool);
+        IoConnector connector = new DatagramConnector(connectorPool);
         SocketAddress address = new InetSocketAddress(configuration.getHost(), configuration.getPort());
 
         if (transferExchange) {
@@ -270,8 +280,7 @@ public class MinaComponent extends Defau
         // must use manual thread model according to Mina documentation
         connectorConfig.setThreadModel(ThreadModel.MANUAL);
         configureDataGramCodecFactory("MinaProducer", connectorConfig, configuration);
-        connectorConfig.getFilterChain().addLast("threadPool",
-                new ExecutorFilter(getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaThreadPool")));
+        connectorConfig.getFilterChain().addLast("threadPool", new ExecutorFilter(workerPool));
         if (minaLogger) {
             connectorConfig.getFilterChain().addLast("logger", new LoggingFilter());
         }
@@ -285,8 +294,7 @@ public class MinaComponent extends Defau
         configureDataGramCodecFactory("MinaConsumer", acceptorConfig, configuration);
         acceptorConfig.setDisconnectOnUnbind(true);
         // reuse address is default true for datagram
-        acceptorConfig.getFilterChain().addLast("threadPool",
-                new ExecutorFilter(getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaThreadPool")));
+        acceptorConfig.getFilterChain().addLast("threadPool", new ExecutorFilter(workerPool));
         if (minaLogger) {
             acceptorConfig.getFilterChain().addLast("logger", new LoggingFilter());
         }
@@ -299,6 +307,12 @@ public class MinaComponent extends Defau
         endpoint.setConnector(connector);
         endpoint.setConnectorConfig(connectorConfig);
         endpoint.setConfiguration(configuration);
+
+        // enlist threads pools in use on endpoint
+        endpoint.addThreadPool(acceptorPool);
+        endpoint.addThreadPool(connectorPool);
+        endpoint.addThreadPool(workerPool);
+
         // set sync or async mode after endpoint is created
         if (sync) {
             endpoint.setExchangePattern(ExchangePattern.InOut);

Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java?rev=1311238&r1=1311237&r2=1311238&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java Mon Apr  9 14:09:32 2012
@@ -17,6 +17,9 @@
 package org.apache.camel.component.mina;
 
 import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
@@ -54,6 +57,7 @@ public class MinaEndpoint extends Defaul
     private IoAcceptorConfig acceptorConfig;
     private IoConnectorConfig connectorConfig;
     private MinaConfiguration configuration;
+    private final List<ExecutorService> executors = new ArrayList<ExecutorService>();
 
     public MinaEndpoint() {
     }
@@ -102,6 +106,23 @@ public class MinaEndpoint extends Defaul
         return configuration.isDatagramProtocol();
     }
 
+    @Override
+    protected void doShutdown() throws Exception {
+        // shutdown thread pools
+        for (ExecutorService executor : executors) {
+            getCamelContext().getExecutorServiceManager().shutdownNow(executor);
+        }
+        executors.clear();
+        super.doShutdown();
+    }
+
+    /**
+     * Add thread pool which are in-use, we need to un-register when shutting down.
+     */
+    protected void addThreadPool(ExecutorService executorService) {
+        executors.add(executorService);
+    }
+
     // Properties
     // -------------------------------------------------------------------------