You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/04/09 17:12:37 UTC

svn commit: r1311277 - in /camel/trunk/components/camel-mina2: ./ src/main/java/org/apache/camel/component/mina2/

Author: davsclaus
Date: Mon Apr  9 15:12:37 2012
New Revision: 1311277

URL: http://svn.apache.org/viewvc?rev=1311277&view=rev
Log:
CAMEL-4853: Use threading accordingly to mina documentation.

Modified:
    camel/trunk/components/camel-mina2/   (props changed)
    camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Component.java
    camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Configuration.java
    camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java
    camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java

Propchange: camel/trunk/components/camel-mina2/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Mon Apr  9 15:12:37 2012
@@ -14,3 +14,4 @@ eclipse-classes
 *.ipr
 *.iml
 *.iws
+.idea

Modified: camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Component.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Component.java?rev=1311277&r1=1311276&r2=1311277&view=diff
==============================================================================
--- camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Component.java (original)
+++ camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Component.java Mon Apr  9 15:12:37 2012
@@ -21,6 +21,7 @@ import java.util.Map;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
+import org.apache.camel.ExchangePattern;
 import org.apache.camel.impl.DefaultComponent;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.mina.core.filterchain.IoFilter;
@@ -75,13 +76,25 @@ public class Mina2Component extends Defa
         String protocol = config.getProtocol();
         // if mistyped uri then protocol can be null
 
+        Mina2Endpoint endpoint = null;
         if (protocol != null) {
             if (protocol.equals("tcp") || config.isDatagramProtocol() || protocol.equals("vm")) {
-                return new Mina2Endpoint(uri, this, config);
+                endpoint = new Mina2Endpoint(uri, this, config);
             }
         }
-        // protocol not resolved so error
-        throw new IllegalArgumentException("Unrecognised MINA protocol: " + protocol + " for uri: " + uri);
+        if (endpoint == null) {
+            // protocol not resolved so error
+            throw new IllegalArgumentException("Unrecognised MINA protocol: " + protocol + " for uri: " + uri);
+        }
+
+        // set sync or async mode after endpoint is created
+        if (config.isSync()) {
+            endpoint.setExchangePattern(ExchangePattern.InOut);
+        } else {
+            endpoint.setExchangePattern(ExchangePattern.InOnly);
+        }
+
+        return endpoint;
     }
 
     // Properties

Modified: camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Configuration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Configuration.java?rev=1311277&r1=1311276&r2=1311277&view=diff
==============================================================================
--- camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Configuration.java (original)
+++ camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Configuration.java Mon Apr  9 15:12:37 2012
@@ -51,6 +51,7 @@ public class Mina2Configuration implemen
     private LoggingLevel noReplyLogLevel = LoggingLevel.WARN;
     private SSLContextParameters sslContextParameters;
     private boolean autoStartTls = true;
+    private int maximumPoolSize = 16; // 16 is the default mina setting
 
     /**
      * Returns a copy of this configuration
@@ -245,4 +246,12 @@ public class Mina2Configuration implemen
     public void setAutoStartTls(boolean autoStartTls) {
         this.autoStartTls = autoStartTls;
     }
+
+    public int getMaximumPoolSize() {
+        return maximumPoolSize;
+    }
+
+    public void setMaximumPoolSize(int maximumPoolSize) {
+        this.maximumPoolSize = maximumPoolSize;
+    }
 }

Modified: camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java?rev=1311277&r1=1311276&r2=1311277&view=diff
==============================================================================
--- camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java (original)
+++ camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java Mon Apr  9 15:12:37 2012
@@ -20,6 +20,7 @@ import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.charset.Charset;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.CamelException;
 import org.apache.camel.Exchange;
@@ -39,6 +40,7 @@ import org.apache.mina.filter.codec.Prot
 import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
 import org.apache.mina.filter.codec.textline.LineDelimiter;
 import org.apache.mina.filter.executor.ExecutorFilter;
+import org.apache.mina.filter.executor.UnorderedThreadPoolExecutor;
 import org.apache.mina.filter.logging.LoggingFilter;
 import org.apache.mina.filter.ssl.SslFilter;
 import org.apache.mina.transport.socket.nio.NioDatagramAcceptor;
@@ -60,6 +62,7 @@ public class Mina2Consumer extends Defau
     private SocketAddress address;
     private IoAcceptor acceptor;
     private Mina2Configuration configuration;
+    private ExecutorService workerPool;
 
     public Mina2Consumer(final Mina2Endpoint endpoint, Processor processor) throws Exception {
         super(endpoint, processor);
@@ -72,11 +75,11 @@ public class Mina2Consumer extends Defau
 
         String protocol = configuration.getProtocol();
         if (protocol.equals("tcp")) {
-            createSocketEndpoint(protocol, configuration);
+            setupSocketProtocol(protocol, configuration);
         } else if (configuration.isDatagramProtocol()) {
-            createDatagramEndpoint(protocol, configuration);
+            setupDatagramProtocol(protocol, configuration);
         } else if (protocol.equals("vm")) {
-            createVmEndpoint(protocol, configuration);
+            setupVmProtocol(protocol, configuration);
         }
     }
 
@@ -96,9 +99,18 @@ public class Mina2Consumer extends Defau
         super.doStop();
     }
 
+    @Override
+    protected void doShutdown() throws Exception {
+        if (workerPool != null) {
+            workerPool.shutdown();
+        }
+        super.doShutdown();
+    }
+
+
     // Implementation methods
     //-------------------------------------------------------------------------
-    protected void createVmEndpoint(String uri, Mina2Configuration configuration) {
+    protected void setupVmProtocol(String uri, Mina2Configuration configuration) {
 
         boolean minaLogger = configuration.isMinaLogger();
         List<IoFilter> filters = configuration.getFilters();
@@ -118,22 +130,24 @@ public class Mina2Consumer extends Defau
         }
     }
 
-    protected void createSocketEndpoint(String uri, Mina2Configuration configuration) throws Exception {
+    protected void setupSocketProtocol(String uri, Mina2Configuration configuration) throws Exception {
         LOG.debug("createSocketEndpoint");
         boolean minaLogger = configuration.isMinaLogger();
         List<IoFilter> filters = configuration.getFilters();
 
         address = new InetSocketAddress(configuration.getHost(), configuration.getPort());
 
-        acceptor = new NioSocketAcceptor(
-            new NioProcessor(this.getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaSocketAcceptor")));
+        final int processorCount = Runtime.getRuntime().availableProcessors() + 1;
+        acceptor = new NioSocketAcceptor(processorCount);
 
         // acceptor connectorConfig
         configureCodecFactory("Mina2Consumer", acceptor, configuration);
         ((NioSocketAcceptor) acceptor).setReuseAddress(true);
         acceptor.setCloseOnDeactivation(true);
-        acceptor.getFilterChain().addLast("threadPool",
-                                          new ExecutorFilter(this.getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaThreadPool")));
+
+        // using the unordered thread pool is fine as we dont need ordered invocation in our response handler
+        workerPool = new UnorderedThreadPoolExecutor(configuration.getMaximumPoolSize());
+        acceptor.getFilterChain().addLast("threadPool", new ExecutorFilter(workerPool));
         if (minaLogger) {
             acceptor.getFilterChain().addLast("logger", new LoggingFilter());
         }
@@ -179,19 +193,17 @@ public class Mina2Consumer extends Defau
 
     }
 
-    protected void createDatagramEndpoint(String uri, Mina2Configuration configuration) {
+    protected void setupDatagramProtocol(String uri, Mina2Configuration configuration) {
         boolean minaLogger = configuration.isMinaLogger();
         List<IoFilter> filters = configuration.getFilters();
 
         address = new InetSocketAddress(configuration.getHost(), configuration.getPort());
-        acceptor = new NioDatagramAcceptor(this.getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaDatagramAcceptor"));
+        acceptor = new NioDatagramAcceptor();
 
         // acceptor connectorConfig
         configureDataGramCodecFactory("MinaConsumer", acceptor, configuration);
         acceptor.setCloseOnDeactivation(true);
         // reuse address is default true for datagram
-        //acceptor.getFilterChain().addLast("threadPool",
-        //                                  new ExecutorFilter(this.getEndpoint().getCamelContext().getExecutorServiceStrategy().newDefaultThreadPool(this, "MinaThreadPool")));
         if (minaLogger) {
             acceptor.getFilterChain().addLast("logger", new LoggingFilter());
         }

Modified: camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java?rev=1311277&r1=1311276&r2=1311277&view=diff
==============================================================================
--- camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java (original)
+++ camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java Mon Apr  9 15:12:37 2012
@@ -21,11 +21,11 @@ import java.net.SocketAddress;
 import java.nio.charset.Charset;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
 import org.apache.camel.ExchangeTimedOutException;
 import org.apache.camel.ServicePoolAware;
 import org.apache.camel.converter.IOConverter;
@@ -46,10 +46,10 @@ import org.apache.mina.filter.codec.Prot
 import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
 import org.apache.mina.filter.codec.textline.LineDelimiter;
 import org.apache.mina.filter.executor.ExecutorFilter;
+import org.apache.mina.filter.executor.UnorderedThreadPoolExecutor;
 import org.apache.mina.filter.logging.LoggingFilter;
 import org.apache.mina.filter.ssl.SslFilter;
 import org.apache.mina.transport.socket.nio.NioDatagramConnector;
-import org.apache.mina.transport.socket.nio.NioProcessor;
 import org.apache.mina.transport.socket.nio.NioSocketConnector;
 import org.apache.mina.transport.vmpipe.VmPipeAddress;
 import org.apache.mina.transport.vmpipe.VmPipeConnector;
@@ -74,10 +74,11 @@ public class Mina2Producer extends Defau
     private CamelLogger noReplyLogger;
     private Mina2Configuration configuration;
     private IoSessionConfig connectorConfig;
+    private ExecutorService workerPool;
 
     public Mina2Producer(Mina2Endpoint endpoint) throws Exception {
         super(endpoint);
-        configuration = endpoint.getConfiguration();
+        this.configuration = endpoint.getConfiguration();
         this.lazySessionCreation = configuration.isLazySessionCreation();
         this.timeout = configuration.getTimeout();
         this.sync = configuration.isSync();
@@ -85,11 +86,11 @@ public class Mina2Producer extends Defau
 
         String protocol = configuration.getProtocol();
         if (protocol.equals("tcp")) {
-            createSocketEndpoint(protocol);
+            setupSocketProtocol(protocol);
         } else if (configuration.isDatagramProtocol()) {
-            createDatagramEndpoint(protocol);
+            setupDatagramProtocol(protocol);
         } else if (protocol.equals("vm")) {
-            createVmEndpoint(protocol);
+            setupVmProtocol(protocol);
         }
     }
 
@@ -107,9 +108,6 @@ public class Mina2Producer extends Defau
 
     @SuppressWarnings("deprecation")
     public void process(Exchange exchange) throws Exception {
-        LOG.debug("Mina2Producer process");
-
-
         if (session == null && !lazySessionCreation) {
             throw new IllegalStateException("Not started yet!");
         }
@@ -149,7 +147,7 @@ public class Mina2Producer extends Defau
                 // byte arrays is not readable so convert to string
                 out = exchange.getContext().getTypeConverter().convertTo(String.class, body);
             }
-            LOG.debug("Writing body : {}", out);
+            LOG.debug("Writing body: {}", out);
         }
         // write the body
         Mina2Helper.writeBody(session, body, exchange);
@@ -193,9 +191,7 @@ public class Mina2Producer extends Defau
             disconnect = close;
         }
         if (disconnect) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Closing session when complete at address: {}", address);
-            }
+            LOG.debug("Closing session when complete at address: {}", address);
             session.close(true);
         }
     }
@@ -221,6 +217,14 @@ public class Mina2Producer extends Defau
         super.doStop();
     }
 
+    @Override
+    protected void doShutdown() throws Exception {
+        if (workerPool != null) {
+            workerPool.shutdown();
+        }
+        super.doShutdown();
+    }
+
     private void closeConnection() {
         if (session != null) {
             CloseFuture closeFuture = session.close(true);
@@ -247,8 +251,7 @@ public class Mina2Producer extends Defau
 
     // Implementation methods
     //-------------------------------------------------------------------------
-    protected void createVmEndpoint(String uri) {
-
+    protected void setupVmProtocol(String uri) {
         boolean minaLogger = configuration.isMinaLogger();
         List<IoFilter> filters = configuration.getFilters();
 
@@ -265,30 +268,24 @@ public class Mina2Producer extends Defau
                      + ", but an SSLContextParameters instance was provided.  SSLContextParameters is only supported on the TCP protocol.");
         }
         configureCodecFactory("Mina2Producer", connector);
-
-        // set sync or async mode after endpoint is created
-        if (sync) {
-            this.getEndpoint().setExchangePattern(ExchangePattern.InOut);
-        } else {
-            this.getEndpoint().setExchangePattern(ExchangePattern.InOnly);
-        }
     }
 
-    protected void createSocketEndpoint(String uri) throws Exception {
-        LOG.debug("createSocketEndpoint");
+    protected void setupSocketProtocol(String uri) throws Exception {
         boolean minaLogger = configuration.isMinaLogger();
         long timeout = configuration.getTimeout();
         List<IoFilter> filters = configuration.getFilters();
 
         address = new InetSocketAddress(configuration.getHost(), configuration.getPort());
 
-        connector = new NioSocketConnector(
-            new NioProcessor(this.getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaSocketConnector")));
+        final int processorCount = Runtime.getRuntime().availableProcessors() + 1;
+        connector = new NioSocketConnector(processorCount);
 
         // connector config
         connectorConfig = connector.getSessionConfig();
-        connector.getFilterChain().addLast("threadPool",
-                                           new ExecutorFilter(this.getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaThreadPool")));
+
+        // using the unordered thread pool is fine as we dont need ordered invocation in our response handler
+        workerPool = new UnorderedThreadPoolExecutor(configuration.getMaximumPoolSize());
+        connector.getFilterChain().addLast("threadPool", new ExecutorFilter(workerPool));
         if (minaLogger) {
             connector.getFilterChain().addLast("logger", new LoggingFilter());
         }
@@ -299,17 +296,9 @@ public class Mina2Producer extends Defau
         configureCodecFactory("Mina2Producer", connector);
         // set connect timeout to mina in seconds
         connector.setConnectTimeoutMillis(timeout);
-
-        // set sync or async mode after endpoint is created
-        if (sync) {
-            this.getEndpoint().setExchangePattern(ExchangePattern.InOut);
-        } else {
-            this.getEndpoint().setExchangePattern(ExchangePattern.InOnly);
-        }
     }
 
     protected void configureCodecFactory(String type, IoService service) {
-        LOG.debug("configureCodecFactory");
         if (configuration.getCodec() != null) {
             addCodecFactory(service, configuration.getCodec());
         } else if (configuration.isAllowDefaultCodec()) {
@@ -318,7 +307,6 @@ public class Mina2Producer extends Defau
     }
 
     protected void configureDefaultCodecFactory(String type, IoService service) {
-        LOG.debug("configureDefaultCodecFactory");
         if (configuration.isTextline()) {
             Charset charset = getEncodingParameter(type, configuration);
             LineDelimiter delimiter = getLineDelimiterParameter(configuration.getTextlineDelimiter());
@@ -333,17 +321,15 @@ public class Mina2Producer extends Defau
             LOG.debug("{}: Using TextLineCodecFactory: {} using encoding: {} line delimiter: {}({})",
                       new Object[]{type, codecFactory, charset, configuration.getTextlineDelimiter(), delimiter});
             LOG.debug("Encoder maximum line length: {}. Decoder maximum line length: {}",
-                      codecFactory.getEncoderMaxLineLength(), codecFactory.getDecoderMaxLineLength());
+                    codecFactory.getEncoderMaxLineLength(), codecFactory.getDecoderMaxLineLength());
         } else {
             ObjectSerializationCodecFactory codecFactory = new ObjectSerializationCodecFactory();
             addCodecFactory(service, codecFactory);
             LOG.debug("{}: Using ObjectSerializationCodecFactory: {}", type, codecFactory);
         }
-        LOG.debug("configureDefaultCodecFactory exit");
-
     }
 
-    protected void createDatagramEndpoint(String uri) {
+    protected void setupDatagramProtocol(String uri) {
         boolean minaLogger = configuration.isMinaLogger();
         boolean transferExchange = configuration.isTransferExchange();
         List<IoFilter> filters = configuration.getFilters();
@@ -353,11 +339,13 @@ public class Mina2Producer extends Defau
         }
 
         address = new InetSocketAddress(configuration.getHost(), configuration.getPort());
-        connector = new NioDatagramConnector(
-            new NioProcessor(this.getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaDatagramConnector")));
+        final int processorCount = Runtime.getRuntime().availableProcessors() + 1;
+        connector = new NioDatagramConnector(processorCount);
+
+        // using the unordered thread pool is fine as we dont need ordered invocation in our response handler
+        workerPool = new UnorderedThreadPoolExecutor(configuration.getMaximumPoolSize());
         connectorConfig = connector.getSessionConfig();
-        connector.getFilterChain().addLast("threadPool",
-                                           new ExecutorFilter(this.getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaThreadPool")));
+        connector.getFilterChain().addLast("threadPool", new ExecutorFilter(workerPool));
         if (minaLogger) {
             connector.getFilterChain().addLast("logger", new LoggingFilter());
         }
@@ -369,13 +357,6 @@ public class Mina2Producer extends Defau
         configureDataGramCodecFactory("Mina2Producer", connector, configuration);
         // set connect timeout to mina in seconds
         connector.setConnectTimeoutMillis(timeout);
-
-        // set sync or async mode after endpoint is created
-        if (sync) {
-            this.getEndpoint().setExchangePattern(ExchangePattern.InOut);
-        } else {
-            this.getEndpoint().setExchangePattern(ExchangePattern.InOnly);
-        }
     }
 
     /**
@@ -482,9 +463,7 @@ public class Mina2Producer extends Defau
         public void sessionClosed(IoSession session) throws Exception {
             if (sync && !messageReceived) {
                 // sync=true (InOut mode) so we expected a message as reply but did not get one before the session is closed
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Session closed but no message received from address: {}", address);
-                }
+                LOG.debug("Session closed but no message received from address: {}", address);
                 // session was closed but no message received. This could be because the remote server had an internal error
                 // and could not return a response. We should count down to stop waiting for a response
                 countDown();