You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/04/09 17:12:37 UTC
svn commit: r1311277 - in /camel/trunk/components/camel-mina2: ./
src/main/java/org/apache/camel/component/mina2/
Author: davsclaus
Date: Mon Apr 9 15:12:37 2012
New Revision: 1311277
URL: http://svn.apache.org/viewvc?rev=1311277&view=rev
Log:
CAMEL-4853: Use threading accordingly to mina documentation.
Modified:
camel/trunk/components/camel-mina2/ (props changed)
camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Component.java
camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Configuration.java
camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java
camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java
Propchange: camel/trunk/components/camel-mina2/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Mon Apr 9 15:12:37 2012
@@ -14,3 +14,4 @@ eclipse-classes
*.ipr
*.iml
*.iws
+.idea
Modified: camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Component.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Component.java?rev=1311277&r1=1311276&r2=1311277&view=diff
==============================================================================
--- camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Component.java (original)
+++ camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Component.java Mon Apr 9 15:12:37 2012
@@ -21,6 +21,7 @@ import java.util.Map;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
+import org.apache.camel.ExchangePattern;
import org.apache.camel.impl.DefaultComponent;
import org.apache.camel.util.ObjectHelper;
import org.apache.mina.core.filterchain.IoFilter;
@@ -75,13 +76,25 @@ public class Mina2Component extends Defa
String protocol = config.getProtocol();
// if mistyped uri then protocol can be null
+ Mina2Endpoint endpoint = null;
if (protocol != null) {
if (protocol.equals("tcp") || config.isDatagramProtocol() || protocol.equals("vm")) {
- return new Mina2Endpoint(uri, this, config);
+ endpoint = new Mina2Endpoint(uri, this, config);
}
}
- // protocol not resolved so error
- throw new IllegalArgumentException("Unrecognised MINA protocol: " + protocol + " for uri: " + uri);
+ if (endpoint == null) {
+ // protocol not resolved so error
+ throw new IllegalArgumentException("Unrecognised MINA protocol: " + protocol + " for uri: " + uri);
+ }
+
+ // set sync or async mode after endpoint is created
+ if (config.isSync()) {
+ endpoint.setExchangePattern(ExchangePattern.InOut);
+ } else {
+ endpoint.setExchangePattern(ExchangePattern.InOnly);
+ }
+
+ return endpoint;
}
// Properties
Modified: camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Configuration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Configuration.java?rev=1311277&r1=1311276&r2=1311277&view=diff
==============================================================================
--- camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Configuration.java (original)
+++ camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Configuration.java Mon Apr 9 15:12:37 2012
@@ -51,6 +51,7 @@ public class Mina2Configuration implemen
private LoggingLevel noReplyLogLevel = LoggingLevel.WARN;
private SSLContextParameters sslContextParameters;
private boolean autoStartTls = true;
+ private int maximumPoolSize = 16; // 16 is the default mina setting
/**
* Returns a copy of this configuration
@@ -245,4 +246,12 @@ public class Mina2Configuration implemen
public void setAutoStartTls(boolean autoStartTls) {
this.autoStartTls = autoStartTls;
}
+
+ public int getMaximumPoolSize() {
+ return maximumPoolSize;
+ }
+
+ public void setMaximumPoolSize(int maximumPoolSize) {
+ this.maximumPoolSize = maximumPoolSize;
+ }
}
Modified: camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java?rev=1311277&r1=1311276&r2=1311277&view=diff
==============================================================================
--- camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java (original)
+++ camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java Mon Apr 9 15:12:37 2012
@@ -20,6 +20,7 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.charset.Charset;
import java.util.List;
+import java.util.concurrent.ExecutorService;
import org.apache.camel.CamelException;
import org.apache.camel.Exchange;
@@ -39,6 +40,7 @@ import org.apache.mina.filter.codec.Prot
import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
import org.apache.mina.filter.codec.textline.LineDelimiter;
import org.apache.mina.filter.executor.ExecutorFilter;
+import org.apache.mina.filter.executor.UnorderedThreadPoolExecutor;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.filter.ssl.SslFilter;
import org.apache.mina.transport.socket.nio.NioDatagramAcceptor;
@@ -60,6 +62,7 @@ public class Mina2Consumer extends Defau
private SocketAddress address;
private IoAcceptor acceptor;
private Mina2Configuration configuration;
+ private ExecutorService workerPool;
public Mina2Consumer(final Mina2Endpoint endpoint, Processor processor) throws Exception {
super(endpoint, processor);
@@ -72,11 +75,11 @@ public class Mina2Consumer extends Defau
String protocol = configuration.getProtocol();
if (protocol.equals("tcp")) {
- createSocketEndpoint(protocol, configuration);
+ setupSocketProtocol(protocol, configuration);
} else if (configuration.isDatagramProtocol()) {
- createDatagramEndpoint(protocol, configuration);
+ setupDatagramProtocol(protocol, configuration);
} else if (protocol.equals("vm")) {
- createVmEndpoint(protocol, configuration);
+ setupVmProtocol(protocol, configuration);
}
}
@@ -96,9 +99,18 @@ public class Mina2Consumer extends Defau
super.doStop();
}
+ @Override
+ protected void doShutdown() throws Exception {
+ if (workerPool != null) {
+ workerPool.shutdown();
+ }
+ super.doShutdown();
+ }
+
+
// Implementation methods
//-------------------------------------------------------------------------
- protected void createVmEndpoint(String uri, Mina2Configuration configuration) {
+ protected void setupVmProtocol(String uri, Mina2Configuration configuration) {
boolean minaLogger = configuration.isMinaLogger();
List<IoFilter> filters = configuration.getFilters();
@@ -118,22 +130,24 @@ public class Mina2Consumer extends Defau
}
}
- protected void createSocketEndpoint(String uri, Mina2Configuration configuration) throws Exception {
+ protected void setupSocketProtocol(String uri, Mina2Configuration configuration) throws Exception {
LOG.debug("createSocketEndpoint");
boolean minaLogger = configuration.isMinaLogger();
List<IoFilter> filters = configuration.getFilters();
address = new InetSocketAddress(configuration.getHost(), configuration.getPort());
- acceptor = new NioSocketAcceptor(
- new NioProcessor(this.getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaSocketAcceptor")));
+ final int processorCount = Runtime.getRuntime().availableProcessors() + 1;
+ acceptor = new NioSocketAcceptor(processorCount);
// acceptor connectorConfig
configureCodecFactory("Mina2Consumer", acceptor, configuration);
((NioSocketAcceptor) acceptor).setReuseAddress(true);
acceptor.setCloseOnDeactivation(true);
- acceptor.getFilterChain().addLast("threadPool",
- new ExecutorFilter(this.getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaThreadPool")));
+
+ // using the unordered thread pool is fine as we dont need ordered invocation in our response handler
+ workerPool = new UnorderedThreadPoolExecutor(configuration.getMaximumPoolSize());
+ acceptor.getFilterChain().addLast("threadPool", new ExecutorFilter(workerPool));
if (minaLogger) {
acceptor.getFilterChain().addLast("logger", new LoggingFilter());
}
@@ -179,19 +193,17 @@ public class Mina2Consumer extends Defau
}
- protected void createDatagramEndpoint(String uri, Mina2Configuration configuration) {
+ protected void setupDatagramProtocol(String uri, Mina2Configuration configuration) {
boolean minaLogger = configuration.isMinaLogger();
List<IoFilter> filters = configuration.getFilters();
address = new InetSocketAddress(configuration.getHost(), configuration.getPort());
- acceptor = new NioDatagramAcceptor(this.getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaDatagramAcceptor"));
+ acceptor = new NioDatagramAcceptor();
// acceptor connectorConfig
configureDataGramCodecFactory("MinaConsumer", acceptor, configuration);
acceptor.setCloseOnDeactivation(true);
// reuse address is default true for datagram
- //acceptor.getFilterChain().addLast("threadPool",
- // new ExecutorFilter(this.getEndpoint().getCamelContext().getExecutorServiceStrategy().newDefaultThreadPool(this, "MinaThreadPool")));
if (minaLogger) {
acceptor.getFilterChain().addLast("logger", new LoggingFilter());
}
Modified: camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java?rev=1311277&r1=1311276&r2=1311277&view=diff
==============================================================================
--- camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java (original)
+++ camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java Mon Apr 9 15:12:37 2012
@@ -21,11 +21,11 @@ import java.net.SocketAddress;
import java.nio.charset.Charset;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
import org.apache.camel.ExchangeTimedOutException;
import org.apache.camel.ServicePoolAware;
import org.apache.camel.converter.IOConverter;
@@ -46,10 +46,10 @@ import org.apache.mina.filter.codec.Prot
import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
import org.apache.mina.filter.codec.textline.LineDelimiter;
import org.apache.mina.filter.executor.ExecutorFilter;
+import org.apache.mina.filter.executor.UnorderedThreadPoolExecutor;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.filter.ssl.SslFilter;
import org.apache.mina.transport.socket.nio.NioDatagramConnector;
-import org.apache.mina.transport.socket.nio.NioProcessor;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.mina.transport.vmpipe.VmPipeConnector;
@@ -74,10 +74,11 @@ public class Mina2Producer extends Defau
private CamelLogger noReplyLogger;
private Mina2Configuration configuration;
private IoSessionConfig connectorConfig;
+ private ExecutorService workerPool;
public Mina2Producer(Mina2Endpoint endpoint) throws Exception {
super(endpoint);
- configuration = endpoint.getConfiguration();
+ this.configuration = endpoint.getConfiguration();
this.lazySessionCreation = configuration.isLazySessionCreation();
this.timeout = configuration.getTimeout();
this.sync = configuration.isSync();
@@ -85,11 +86,11 @@ public class Mina2Producer extends Defau
String protocol = configuration.getProtocol();
if (protocol.equals("tcp")) {
- createSocketEndpoint(protocol);
+ setupSocketProtocol(protocol);
} else if (configuration.isDatagramProtocol()) {
- createDatagramEndpoint(protocol);
+ setupDatagramProtocol(protocol);
} else if (protocol.equals("vm")) {
- createVmEndpoint(protocol);
+ setupVmProtocol(protocol);
}
}
@@ -107,9 +108,6 @@ public class Mina2Producer extends Defau
@SuppressWarnings("deprecation")
public void process(Exchange exchange) throws Exception {
- LOG.debug("Mina2Producer process");
-
-
if (session == null && !lazySessionCreation) {
throw new IllegalStateException("Not started yet!");
}
@@ -149,7 +147,7 @@ public class Mina2Producer extends Defau
// byte arrays is not readable so convert to string
out = exchange.getContext().getTypeConverter().convertTo(String.class, body);
}
- LOG.debug("Writing body : {}", out);
+ LOG.debug("Writing body: {}", out);
}
// write the body
Mina2Helper.writeBody(session, body, exchange);
@@ -193,9 +191,7 @@ public class Mina2Producer extends Defau
disconnect = close;
}
if (disconnect) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Closing session when complete at address: {}", address);
- }
+ LOG.debug("Closing session when complete at address: {}", address);
session.close(true);
}
}
@@ -221,6 +217,14 @@ public class Mina2Producer extends Defau
super.doStop();
}
+ @Override
+ protected void doShutdown() throws Exception {
+ if (workerPool != null) {
+ workerPool.shutdown();
+ }
+ super.doShutdown();
+ }
+
private void closeConnection() {
if (session != null) {
CloseFuture closeFuture = session.close(true);
@@ -247,8 +251,7 @@ public class Mina2Producer extends Defau
// Implementation methods
//-------------------------------------------------------------------------
- protected void createVmEndpoint(String uri) {
-
+ protected void setupVmProtocol(String uri) {
boolean minaLogger = configuration.isMinaLogger();
List<IoFilter> filters = configuration.getFilters();
@@ -265,30 +268,24 @@ public class Mina2Producer extends Defau
+ ", but an SSLContextParameters instance was provided. SSLContextParameters is only supported on the TCP protocol.");
}
configureCodecFactory("Mina2Producer", connector);
-
- // set sync or async mode after endpoint is created
- if (sync) {
- this.getEndpoint().setExchangePattern(ExchangePattern.InOut);
- } else {
- this.getEndpoint().setExchangePattern(ExchangePattern.InOnly);
- }
}
- protected void createSocketEndpoint(String uri) throws Exception {
- LOG.debug("createSocketEndpoint");
+ protected void setupSocketProtocol(String uri) throws Exception {
boolean minaLogger = configuration.isMinaLogger();
long timeout = configuration.getTimeout();
List<IoFilter> filters = configuration.getFilters();
address = new InetSocketAddress(configuration.getHost(), configuration.getPort());
- connector = new NioSocketConnector(
- new NioProcessor(this.getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaSocketConnector")));
+ final int processorCount = Runtime.getRuntime().availableProcessors() + 1;
+ connector = new NioSocketConnector(processorCount);
// connector config
connectorConfig = connector.getSessionConfig();
- connector.getFilterChain().addLast("threadPool",
- new ExecutorFilter(this.getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaThreadPool")));
+
+ // using the unordered thread pool is fine as we dont need ordered invocation in our response handler
+ workerPool = new UnorderedThreadPoolExecutor(configuration.getMaximumPoolSize());
+ connector.getFilterChain().addLast("threadPool", new ExecutorFilter(workerPool));
if (minaLogger) {
connector.getFilterChain().addLast("logger", new LoggingFilter());
}
@@ -299,17 +296,9 @@ public class Mina2Producer extends Defau
configureCodecFactory("Mina2Producer", connector);
// set connect timeout to mina in seconds
connector.setConnectTimeoutMillis(timeout);
-
- // set sync or async mode after endpoint is created
- if (sync) {
- this.getEndpoint().setExchangePattern(ExchangePattern.InOut);
- } else {
- this.getEndpoint().setExchangePattern(ExchangePattern.InOnly);
- }
}
protected void configureCodecFactory(String type, IoService service) {
- LOG.debug("configureCodecFactory");
if (configuration.getCodec() != null) {
addCodecFactory(service, configuration.getCodec());
} else if (configuration.isAllowDefaultCodec()) {
@@ -318,7 +307,6 @@ public class Mina2Producer extends Defau
}
protected void configureDefaultCodecFactory(String type, IoService service) {
- LOG.debug("configureDefaultCodecFactory");
if (configuration.isTextline()) {
Charset charset = getEncodingParameter(type, configuration);
LineDelimiter delimiter = getLineDelimiterParameter(configuration.getTextlineDelimiter());
@@ -333,17 +321,15 @@ public class Mina2Producer extends Defau
LOG.debug("{}: Using TextLineCodecFactory: {} using encoding: {} line delimiter: {}({})",
new Object[]{type, codecFactory, charset, configuration.getTextlineDelimiter(), delimiter});
LOG.debug("Encoder maximum line length: {}. Decoder maximum line length: {}",
- codecFactory.getEncoderMaxLineLength(), codecFactory.getDecoderMaxLineLength());
+ codecFactory.getEncoderMaxLineLength(), codecFactory.getDecoderMaxLineLength());
} else {
ObjectSerializationCodecFactory codecFactory = new ObjectSerializationCodecFactory();
addCodecFactory(service, codecFactory);
LOG.debug("{}: Using ObjectSerializationCodecFactory: {}", type, codecFactory);
}
- LOG.debug("configureDefaultCodecFactory exit");
-
}
- protected void createDatagramEndpoint(String uri) {
+ protected void setupDatagramProtocol(String uri) {
boolean minaLogger = configuration.isMinaLogger();
boolean transferExchange = configuration.isTransferExchange();
List<IoFilter> filters = configuration.getFilters();
@@ -353,11 +339,13 @@ public class Mina2Producer extends Defau
}
address = new InetSocketAddress(configuration.getHost(), configuration.getPort());
- connector = new NioDatagramConnector(
- new NioProcessor(this.getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaDatagramConnector")));
+ final int processorCount = Runtime.getRuntime().availableProcessors() + 1;
+ connector = new NioDatagramConnector(processorCount);
+
+ // using the unordered thread pool is fine as we dont need ordered invocation in our response handler
+ workerPool = new UnorderedThreadPoolExecutor(configuration.getMaximumPoolSize());
connectorConfig = connector.getSessionConfig();
- connector.getFilterChain().addLast("threadPool",
- new ExecutorFilter(this.getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaThreadPool")));
+ connector.getFilterChain().addLast("threadPool", new ExecutorFilter(workerPool));
if (minaLogger) {
connector.getFilterChain().addLast("logger", new LoggingFilter());
}
@@ -369,13 +357,6 @@ public class Mina2Producer extends Defau
configureDataGramCodecFactory("Mina2Producer", connector, configuration);
// set connect timeout to mina in seconds
connector.setConnectTimeoutMillis(timeout);
-
- // set sync or async mode after endpoint is created
- if (sync) {
- this.getEndpoint().setExchangePattern(ExchangePattern.InOut);
- } else {
- this.getEndpoint().setExchangePattern(ExchangePattern.InOnly);
- }
}
/**
@@ -482,9 +463,7 @@ public class Mina2Producer extends Defau
public void sessionClosed(IoSession session) throws Exception {
if (sync && !messageReceived) {
// sync=true (InOut mode) so we expected a message as reply but did not get one before the session is closed
- if (LOG.isDebugEnabled()) {
- LOG.debug("Session closed but no message received from address: {}", address);
- }
+ LOG.debug("Session closed but no message received from address: {}", address);
// session was closed but no message received. This could be because the remote server had an internal error
// and could not return a response. We should count down to stop waiting for a response
countDown();