You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/11/26 15:16:02 UTC
svn commit: r598285 [1/2] - in /incubator/qpid/branches/M2.1.1/java: broker/
broker/etc/ broker/src/main/java/org/apache/qpid/server/
broker/src/main/java/org/apache/qpid/server/protocol/
broker/src/main/java/org/apache/qpid/server/transport/ client/sr...
Author: ritchiem
Date: Mon Nov 26 06:16:01 2007
New Revision: 598285
URL: http://svn.apache.org/viewvc?rev=598285&view=rev
Log:
QPID-92, QPID-564 : Upgraded Mina to 1.0.1 still not good enough but all future versions currently have a bug with the CumulativeProtocolDecoder. It compact()s the buffer which breaks slices. Added MultiThread Support which is some of the feature set of QPID-564
Added:
incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/
incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/
incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/WriteBufferFullExeception.java (with props)
incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/WriteBufferLimitFilterBuilder.java (with props)
incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/codec/
incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java (with props)
incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/
incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/
incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/
incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java (with props)
incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java (with props)
incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketFilterChain.java (with props)
incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java (with props)
incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionConfigImpl.java (with props)
incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionImpl.java (with props)
Modified:
incubator/qpid/branches/M2.1.1/java/broker/etc/config.xml
incubator/qpid/branches/M2.1.1/java/broker/pom.xml
incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/Main.java
incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java
incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
incubator/qpid/branches/M2.1.1/java/common/pom.xml
Modified: incubator/qpid/branches/M2.1.1/java/broker/etc/config.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/broker/etc/config.xml?rev=598285&r1=598284&r2=598285&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.1/java/broker/etc/config.xml (original)
+++ incubator/qpid/branches/M2.1.1/java/broker/etc/config.xml Mon Nov 26 06:16:01 2007
@@ -33,6 +33,7 @@
<keystorePassword>keystorepass</keystorePassword>
</ssl>-->
<qpidnio>true</qpidnio>
+ <protectio>true</protectio>
<transport>nio</transport>
<port>5672</port>
<sslport>8672</sslport>
@@ -172,5 +173,6 @@
<virtualhosts>${conf}/virtualhosts.xml</virtualhosts>
</broker>
+
Modified: incubator/qpid/branches/M2.1.1/java/broker/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/broker/pom.xml?rev=598285&r1=598284&r2=598285&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.1/java/broker/pom.xml (original)
+++ incubator/qpid/branches/M2.1.1/java/broker/pom.xml Mon Nov 26 06:16:01 2007
@@ -6,9 +6,9 @@
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
-
+
http://www.apache.org/licenses/LICENSE-2.0
-
+
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -49,16 +49,16 @@
<artifactId>log4j</artifactId>
</dependency>
- <dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>1.4.0</version>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.4.0</version>
</dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <version>1.4.0</version>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.4.0</version>
</dependency>
<dependency>
@@ -208,7 +208,7 @@
<isset property="skip.python.tests"/>
</condition>
- <property name="command"
+ <property name="command"
value="python run-tests -v -I java_failing.txt -b localhost:2100"/>
<!--value="bash -c 'python run-tests -v -I java_failing.txt'"/>-->
Modified: incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/Main.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/Main.java?rev=598285&r1=598284&r2=598285&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/Main.java (original)
+++ incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/Main.java Mon Nov 26 06:16:01 2007
@@ -436,7 +436,7 @@
}
}
- //fixme qpid.AMQP should be using qpidproperties to get value
+ //fixme qpid.AMQP should be using qpidproperties to get value
_brokerLogger.info("Qpid Broker Ready :" + QpidProperties.getReleaseVersion()
+ " build: " + QpidProperties.getBuildVersion());
}
Modified: incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java?rev=598285&r1=598284&r2=598285&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java (original)
+++ incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java Mon Nov 26 06:16:01 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,10 +23,15 @@
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.ReadThrottleFilterBuilder;
import org.apache.mina.filter.SSLFilter;
+import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.filter.executor.ExecutorFilter;
+import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import org.apache.mina.util.SessionUtil;
import org.apache.qpid.AMQException;
import org.apache.qpid.codec.AMQCodecFactory;
@@ -51,7 +56,6 @@
*
* We delegate all frame (message) processing to the AMQProtocolSession which wraps
* the state for the connection.
- *
*/
public class AMQPFastProtocolHandler extends IoHandlerAdapter
{
@@ -115,11 +119,41 @@
}
}
+
+ if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("broker.connector.protectio", false))
+ {
+ try
+ {
+// //Add IO Protection Filters
+ IoFilterChain chain = protocolSession.getFilterChain();
+
+ int buf_size = 32768;
+ if (protocolSession.getConfig() instanceof SocketSessionConfig)
+ {
+ buf_size = ((SocketSessionConfig) protocolSession.getConfig()).getReceiveBufferSize();
+ }
+
+ protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
+
+ ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
+ readfilter.setMaximumConnectionBufferSize(buf_size);
+ readfilter.attach(chain);
+
+ WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
+ writefilter.setMaximumConnectionBufferSize(buf_size * 2);
+ writefilter.attach(chain);
+
+ protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
+ _logger.info("Using IO Read/Write Filter Protection");
+ }
+ catch (Exception e)
+ {
+ _logger.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage());
+ }
+ }
}
- /**
- * Separated into its own, protected, method to allow easier reuse
- */
+ /** Separated into its own, protected, method to allow easier reuse */
protected void createSession(IoSession session, IApplicationRegistry applicationRegistry, AMQCodecFactory codec) throws AMQException
{
new AMQMinaProtocolSession(session, applicationRegistry.getVirtualHostRegistry(), codec);
@@ -193,8 +227,10 @@
/**
* Invoked when a message is received on a particular protocol session. Note that a
* protocol session is directly tied to a particular physical connection.
+ *
* @param protocolSession the protocol session that received the message
- * @param message the message itself (i.e. a decoded frame)
+ * @param message the message itself (i.e. a decoded frame)
+ *
* @throws Exception if the message cannot be processed
*/
public void messageReceived(IoSession protocolSession, Object message) throws Exception
@@ -204,7 +240,7 @@
if (message instanceof AMQDataBlock)
{
amqProtocolSession.dataBlockReceived((AMQDataBlock) message);
-
+
}
else if (message instanceof ByteBuffer)
{
@@ -218,9 +254,11 @@
/**
* Called after a message has been sent out on a particular protocol session
+ *
* @param protocolSession the protocol session (i.e. connection) on which this
- * message was sent
- * @param object the message (frame) that was encoded and sent
+ * message was sent
+ * @param object the message (frame) that was encoded and sent
+ *
* @throws Exception if we want to indicate an error
*/
public void messageSent(IoSession protocolSession, Object object) throws Exception
Modified: incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java?rev=598285&r1=598284&r2=598285&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java (original)
+++ incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java Mon Nov 26 06:16:01 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,9 +23,12 @@
import org.apache.mina.common.IoAcceptor;
import org.apache.mina.util.NewThreadExecutor;
import org.apache.qpid.configuration.Configured;
+import org.apache.log4j.Logger;
public class ConnectorConfiguration
{
+ private static final Logger _logger = Logger.getLogger(ConnectorConfiguration.class);
+
public static final String DEFAULT_PORT = "5672";
public static final String SSL_PORT = "8672";
@@ -41,7 +44,7 @@
@Configured(path = "connector.bind",
defaultValue = "wildcard")
public String bindAddress;
-
+
@Configured(path = "connector.socketReceiveBuffer",
defaultValue = "32767")
public int socketReceiveBufferSize;
@@ -69,29 +72,43 @@
@Configured(path = "connector.ssl.enabled",
defaultValue = "false")
public boolean enableSSL;
-
+
@Configured(path = "connector.ssl.sslOnly",
- defaultValue = "true")
+ defaultValue = "true")
public boolean sslOnly;
-
+
@Configured(path = "connector.ssl.port",
- defaultValue = SSL_PORT)
- public int sslPort;
-
+ defaultValue = SSL_PORT)
+ public int sslPort;
+
@Configured(path = "connector.ssl.keystorePath",
- defaultValue = "none")
+ defaultValue = "none")
public String keystorePath;
-
+
@Configured(path = "connector.ssl.keystorePassword",
- defaultValue = "none")
+ defaultValue = "none")
public String keystorePassword;
-
+
@Configured(path = "connector.ssl.certType",
- defaultValue = "SunX509")
+ defaultValue = "SunX509")
public String certType;
+ @Configured(path = "connector.qpidnio",
+ defaultValue = "true")
+ public boolean _multiThreadNIO;
+
+
public IoAcceptor createAcceptor()
{
- return new org.apache.mina.transport.socket.nio.SocketAcceptor(processors, new NewThreadExecutor());
+ if (_multiThreadNIO)
+ {
+ _logger.warn("Using Qpid Multithreaded IO Processing");
+ return new org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptor(processors, new NewThreadExecutor());
+ }
+ else
+ {
+ _logger.warn("Using Mina IO Processing");
+ return new org.apache.mina.transport.socket.nio.SocketAcceptor(processors, new NewThreadExecutor());
+ }
}
}
Modified: incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=598285&r1=598284&r2=598285&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Mon Nov 26 06:16:01 2007
@@ -21,12 +21,16 @@
package org.apache.qpid.client.protocol;
import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.ReadThrottleFilterBuilder;
import org.apache.mina.filter.SSLFilter;
+import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
import org.apache.mina.filter.codec.ProtocolCodecException;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
-
+import org.apache.mina.filter.executor.ExecutorFilter;
+import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
@@ -216,6 +220,36 @@
e.printStackTrace();
}
+ if (!System.getProperties().containsKey("protectio") || Boolean.getBoolean("protectio"))
+ {
+ try
+ {
+ //Add IO Protection Filters
+ IoFilterChain chain = session.getFilterChain();
+
+ int buf_size = 32768;
+ if (session.getConfig() instanceof SocketSessionConfig)
+ {
+ buf_size = ((SocketSessionConfig) session.getConfig()).getReceiveBufferSize();
+ }
+ session.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
+
+ ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
+ readfilter.setMaximumConnectionBufferSize(buf_size);
+ readfilter.attach(chain);
+
+ WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
+ writefilter.setMaximumConnectionBufferSize(buf_size * 2);
+ writefilter.attach(chain);
+ session.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
+
+ _logger.info("Using IO Read/Write Filter Protection");
+ }
+ catch (Exception e)
+ {
+ _logger.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage());
+ }
+ }
_protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager());
_protocolSession.init();
}
Modified: incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java?rev=598285&r1=598284&r2=598285&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java (original)
+++ incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java Mon Nov 26 06:16:01 2007
@@ -23,14 +23,13 @@
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.transport.socket.nio.MultiThreadSocketConnector;
import org.apache.mina.transport.socket.nio.SocketConnector;
import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
-
import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.pool.ReadWriteThreadModel;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,40 +89,41 @@
switch (transport)
{
- case TCP:
- _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
+ case TCP:
+ _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
+ {
+ public IoConnector newSocketConnector()
{
- public IoConnector newSocketConnector()
+ SocketConnector result;
+ // FIXME - this needs to be sorted to use the new Mina MultiThread SA.
+ if (!System.getProperties().containsKey("qpidnio") || Boolean.getBoolean("qpidnio"))
+ {
+ _logger.warn("Using Qpid MultiThreaded NIO - " + (System.getProperties().containsKey("qpidnio")
+ ? "Qpid NIO is new default"
+ : "Sysproperty 'qpidnio' is set"));
+ result = new MultiThreadSocketConnector();
+ }
+ else
{
- SocketConnector result;
- // FIXME - this needs to be sorted to use the new Mina MultiThread SA.
- if (Boolean.getBoolean("qpidnio"))
- {
- _logger.error("Using Qpid NIO - sysproperty 'qpidnio' is set.");
- // result = new org.apache.qpid.nio.SocketConnector(); // non-blocking connector
- }
- // else
-
- {
- _logger.info("Using Mina NIO");
- result = new SocketConnector(); // non-blocking connector
- }
-
- // Don't have the connector's worker thread wait around for other connections (we only use
- // one SocketConnector per connection at the moment anyway). This allows short-running
- // clients (like unit tests) to complete quickly.
- result.setWorkerTimeout(0);
-
- return result;
+ _logger.info("Using Mina NIO");
+ result = new SocketConnector(); // non-blocking connector
}
- });
- break;
- case VM:
- {
- _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
- break;
- }
+ // Don't have the connector's worker thread wait around for other connections (we only use
+ // one SocketConnector per connection at the moment anyway). This allows short-running
+ // clients (like unit tests) to complete quickly.
+ result.setWorkerTimeout(0);
+
+ return result;
+ }
+ });
+ break;
+
+ case VM:
+ {
+ _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
+ break;
+ }
}
return _instance;
@@ -145,7 +145,7 @@
}
private static ITransportConnection getVMTransport(BrokerDetails details, boolean AutoCreate)
- throws AMQVMBrokerCreationException
+ throws AMQVMBrokerCreationException
{
int port = details.getPort();
@@ -160,7 +160,7 @@
else
{
throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port
- + " does not exist. Auto create disabled.", null);
+ + " does not exist. Auto create disabled.", null);
}
}
}
@@ -257,8 +257,8 @@
IoHandlerAdapter provider;
try
{
- Class[] cnstr = { Integer.class };
- Object[] params = { port };
+ Class[] cnstr = {Integer.class};
+ Object[] params = {port};
provider = (IoHandlerAdapter) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params);
// Give the broker a second to create
_logger.info("Created VMBroker Instance:" + port);
@@ -277,7 +277,7 @@
}
AMQVMBrokerCreationException amqbce =
- new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", null);
+ new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", null);
amqbce.initCause(e);
throw amqbce;
}
Modified: incubator/qpid/branches/M2.1.1/java/common/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/common/pom.xml?rev=598285&r1=598284&r2=598285&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.1/java/common/pom.xml (original)
+++ incubator/qpid/branches/M2.1.1/java/common/pom.xml Mon Nov 26 06:16:01 2007
@@ -6,9 +6,9 @@
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
-
+
http://www.apache.org/licenses/LICENSE-2.0
-
+
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -45,7 +45,7 @@
</properties>
<build>
- <plugins>
+ <plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
@@ -114,14 +114,14 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
- <version>1.4.0</version>
+ <version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.4.0</version>
- <scope>test</scope>
+ <scope>test</scope>
</dependency>
<dependency>
Added: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/WriteBufferFullExeception.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/WriteBufferFullExeception.java?rev=598285&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/WriteBufferFullExeception.java (added)
+++ incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/WriteBufferFullExeception.java Mon Nov 26 06:16:01 2007
@@ -0,0 +1,48 @@
+package org.apache.mina.filter;
+
+import org.apache.mina.common.IoFilter;/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+public class WriteBufferFullExeception extends RuntimeException
+{
+ private IoFilter.WriteRequest _writeRequest;
+
+ public WriteBufferFullExeception()
+ {
+ this(null);
+ }
+
+ public WriteBufferFullExeception(IoFilter.WriteRequest writeRequest)
+ {
+ _writeRequest = writeRequest;
+ }
+
+
+ public void setWriteRequest(IoFilter.WriteRequest writeRequest)
+ {
+ _writeRequest = writeRequest;
+ }
+
+ public IoFilter.WriteRequest getWriteRequest()
+ {
+ return _writeRequest;
+ }
+}
Propchange: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/WriteBufferFullExeception.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/WriteBufferFullExeception.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/WriteBufferLimitFilterBuilder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/WriteBufferLimitFilterBuilder.java?rev=598285&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/WriteBufferLimitFilterBuilder.java (added)
+++ incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/WriteBufferLimitFilterBuilder.java Mon Nov 26 06:16:01 2007
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.filter;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.DefaultIoFilterChainBuilder;
+import org.apache.mina.common.IoFilterAdapter;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.executor.ExecutorFilter;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * This filter will turn the asynchronous filterWrite method in to a blocking send when there are more than
+ * the prescribed number of messages awaiting filterWrite. It should be used in conjunction with the
+ * {@link ReadThrottleFilterBuilder} on a server as the blocking writes will allow the read thread to
+ * cause an Out of Memory exception due to a back log of unprocessed messages.
+ *
+ * This is should only be viewed as a temporary work around for DIRMINA-302.
+ *
+ * A true solution should not be implemented as a filter as this issue will always occur. On a machine
+ * where the network is slower than the local producer.
+ *
+ * Suggested improvement is to allow implementation of policices on what to do when buffer is full.
+ *
+ * They could be:
+ * Block - As this does
+ * Wait on a given Future - to drain more of the queue.. in essence this filter with high/low watermarks
+ * Throw Exception - through the client filterWrite() method to allow them to get immediate feedback on buffer state
+ *
+ * <p/>
+ * <p>Usage:
+ * <p/>
+ * <pre><code>
+ * DefaultFilterChainBuilder builder = ...
+ * WriteBufferLimitFilterBuilder filter = new WriteBufferLimitFilterBuilder();
+ * filter.attach( builder );
+ * </code></pre>
+ * <p/>
+ * or
+ * <p/>
+ * <pre><code>
+ * IoFilterChain chain = ...
+ * WriteBufferLimitFilterBuilder filter = new WriteBufferLimitFilterBuilder();
+ * filter.attach( chain );
+ * </code></pre>
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class WriteBufferLimitFilterBuilder
+{
+ public static final String PENDING_SIZE = WriteBufferLimitFilterBuilder.class.getName() + ".pendingSize";
+
+ private static int DEFAULT_CONNECTION_BUFFER_MESSAGE_COUNT = 5000;
+
+ private volatile boolean throwNotBlock = false;
+
+ private volatile int maximumConnectionBufferCount;
+ private volatile long maximumConnectionBufferSize;
+
+ private final Object _blockLock = new Object();
+
+ private int _blockWaiters = 0;
+
+
+ public WriteBufferLimitFilterBuilder()
+ {
+ this(DEFAULT_CONNECTION_BUFFER_MESSAGE_COUNT);
+ }
+
+ public WriteBufferLimitFilterBuilder(int maxWriteBufferSize)
+ {
+ setMaximumConnectionBufferCount(maxWriteBufferSize);
+ }
+
+
+ /**
+ * Set the maximum amount pending items in the writeQueue for a given session.
+ * Changing the value will only take effect when new data is received for a
+ * connection, including existing connections. Default value is 5000 msgs.
+ *
+ * @param maximumConnectionBufferCount New buffer size. Must be > 0
+ */
+ public void setMaximumConnectionBufferCount(int maximumConnectionBufferCount)
+ {
+ this.maximumConnectionBufferCount = maximumConnectionBufferCount;
+ this.maximumConnectionBufferSize = 0;
+ }
+
+ public void setMaximumConnectionBufferSize(long maximumConnectionBufferSize)
+ {
+ this.maximumConnectionBufferSize = maximumConnectionBufferSize;
+ this.maximumConnectionBufferCount = 0;
+ }
+
+ /**
+ * Attach this filter to the specified filter chain. It will search for the ThreadPoolFilter, and attach itself
+ * before and after that filter.
+ *
+ * @param chain {@link IoFilterChain} to attach self to.
+ */
+ public void attach(IoFilterChain chain)
+ {
+ String name = getThreadPoolFilterEntryName(chain.getAll());
+
+ chain.addBefore(name, getClass().getName() + ".sendlimit", new SendLimit());
+ }
+
+ /**
+ * Attach this filter to the specified builder. It will search for the
+ * {@link ExecutorFilter}, and attach itself before and after that filter.
+ *
+ * @param builder {@link DefaultIoFilterChainBuilder} to attach self to.
+ */
+ public void attach(DefaultIoFilterChainBuilder builder)
+ {
+ String name = getThreadPoolFilterEntryName(builder.getAll());
+
+ builder.addBefore(name, getClass().getName() + ".sendlimit", new SendLimit());
+ }
+
+ private String getThreadPoolFilterEntryName(List entries)
+ {
+ Iterator i = entries.iterator();
+
+ while (i.hasNext())
+ {
+ IoFilterChain.Entry entry = (IoFilterChain.Entry) i.next();
+
+ if (entry.getFilter().getClass().isAssignableFrom(ExecutorFilter.class))
+ {
+ return entry.getName();
+ }
+ }
+
+ throw new IllegalStateException("Chain does not contain a ExecutorFilter");
+ }
+
+
+ public class SendLimit extends IoFilterAdapter
+ {
+ public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception
+ {
+ try
+ {
+ waitTillSendAllowed(session);
+ }
+ catch (WriteBufferFullExeception wbfe)
+ {
+ nextFilter.exceptionCaught(session, wbfe);
+ }
+
+ if (writeRequest.getMessage() instanceof ByteBuffer)
+ {
+ increasePendingWriteSize(session, (ByteBuffer) writeRequest.getMessage());
+ }
+
+ nextFilter.filterWrite(session, writeRequest);
+ }
+
+ private void increasePendingWriteSize(IoSession session, ByteBuffer message)
+ {
+ synchronized (session)
+ {
+ Long pendingSize = getScheduledWriteBytes(session) + message.remaining();
+ session.setAttribute(PENDING_SIZE, pendingSize);
+ }
+ }
+
+ private boolean sendAllowed(IoSession session)
+ {
+ if (session.isClosing())
+ {
+ return true;
+ }
+
+ int lmswm = maximumConnectionBufferCount;
+ long lmswb = maximumConnectionBufferSize;
+
+ return (lmswm == 0 || session.getScheduledWriteRequests() < lmswm)
+ && (lmswb == 0 || getScheduledWriteBytes(session) < lmswb);
+ }
+
+ private long getScheduledWriteBytes(IoSession session)
+ {
+ synchronized (session)
+ {
+ Long i = (Long) session.getAttribute(PENDING_SIZE);
+ return null == i ? 0 : i;
+ }
+ }
+
+ private void waitTillSendAllowed(IoSession session)
+ {
+ synchronized (_blockLock)
+ {
+ if (throwNotBlock)
+ {
+ throw new WriteBufferFullExeception();
+ }
+
+ _blockWaiters++;
+
+ while (!sendAllowed(session))
+ {
+ try
+ {
+ _blockLock.wait();
+ }
+ catch (InterruptedException e)
+ {
+ // Ignore.
+ }
+ }
+ _blockWaiters--;
+ }
+ }
+
+ public void messageSent(NextFilter nextFilter, IoSession session, Object message) throws Exception
+ {
+ if (message instanceof ByteBuffer)
+ {
+ decrementPendingWriteSize(session, (ByteBuffer) message);
+ }
+ notifyWaitingWriters();
+ nextFilter.messageSent(session, message);
+ }
+
+ private void decrementPendingWriteSize(IoSession session, ByteBuffer message)
+ {
+ synchronized (session)
+ {
+ session.setAttribute(PENDING_SIZE, getScheduledWriteBytes(session) - message.remaining());
+ }
+ }
+
+ private void notifyWaitingWriters()
+ {
+ synchronized (_blockLock)
+ {
+ if (_blockWaiters != 0)
+ {
+ _blockLock.notifyAll();
+ }
+ }
+
+ }
+
+ }//SentLimit
+
+
+}
Propchange: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/WriteBufferLimitFilterBuilder.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/WriteBufferLimitFilterBuilder.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java?rev=598285&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java (added)
+++ incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java Mon Nov 26 06:16:01 2007
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.filter.codec;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
+
+/**
+ * A {@link ProtocolDecoder} that cumulates the content of received
+ * buffers to a <em>cumulative buffer</em> to help users implement decoders.
+ * <p>
+ * If the received {@link ByteBuffer} is only a part of a message.
+ * decoders should cumulate received buffers to make a message complete or
+ * to postpone decoding until more buffers arrive.
+ * <p>
+ * Here is an example decoder that decodes CRLF terminated lines into
+ * <code>Command</code> objects:
+ * <pre>
+ * public class CRLFTerminatedCommandLineDecoder
+ * extends CumulativeProtocolDecoder {
+ *
+ * private Command parseCommand(ByteBuffer in) {
+ * // Convert the bytes in the specified buffer to a
+ * // Command object.
+ * ...
+ * }
+ *
+ * protected boolean doDecode(IoSession session, ByteBuffer in,
+ * ProtocolDecoderOutput out)
+ * throws Exception {
+ *
+ * // Remember the initial position.
+ * int start = in.position();
+ *
+ * // Now find the first CRLF in the buffer.
+ * byte previous = 0;
+ * while (in.hasRemaining()) {
+ * byte current = in.get();
+ *
+ * if (previous == '\r' && current == '\n') {
+ * // Remember the current position and limit.
+ * int position = in.position();
+ * int limit = in.limit();
+ * try {
+ * in.position(start);
+ * in.limit(position);
+ * // The bytes between in.position() and in.limit()
+ * // now contain a full CRLF terminated line.
+ * out.write(parseCommand(in.slice()));
+ * } finally {
+ * // Set the position to point right after the
+ * // detected line and set the limit to the old
+ * // one.
+ * in.position(position);
+ * in.limit(limit);
+ * }
+ * // Decoded one line; CumulativeProtocolDecoder will
+ * // call me again until I return false. So just
+ * // return true until there are no more lines in the
+ * // buffer.
+ * return true;
+ * }
+ *
+ * previous = current;
+ * }
+ *
+ * // Could not find CRLF in the buffer. Reset the initial
+ * // position to the one we recorded above.
+ * in.position(start);
+ *
+ * return false;
+ * }
+ * }
+ * </pre>
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public abstract class OurCumulativeProtocolDecoder extends ProtocolDecoderAdapter {
+
+ private static final String BUFFER = OurCumulativeProtocolDecoder.class
+ .getName()
+ + ".Buffer";
+
+ /**
+ * Creates a new instance.
+ */
+ protected OurCumulativeProtocolDecoder() {
+ }
+
+ /**
+ * Cumulates content of <tt>in</tt> into internal buffer and forwards
+ * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
+ * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
+ * and the cumulative buffer is NOT compacted after decoding ends.
+ *
+ * @throws IllegalStateException if your <tt>doDecode()</tt> returned
+ * <tt>true</tt> not consuming the cumulative buffer.
+ */
+ public void decode(IoSession session, ByteBuffer in,
+ ProtocolDecoderOutput out) throws Exception {
+ boolean usingSessionBuffer = true;
+ ByteBuffer buf = (ByteBuffer) session.getAttribute(BUFFER);
+ // If we have a session buffer, append data to that; otherwise
+ // use the buffer read from the network directly.
+ if (buf != null) {
+ buf.put(in);
+ buf.flip();
+ } else {
+ buf = in;
+ usingSessionBuffer = false;
+ }
+
+ for (;;) {
+ int oldPos = buf.position();
+ boolean decoded = doDecode(session, buf, out);
+ if (decoded) {
+ if (buf.position() == oldPos) {
+ throw new IllegalStateException(
+ "doDecode() can't return true when buffer is not consumed.");
+ }
+
+ if (!buf.hasRemaining()) {
+ break;
+ }
+ } else {
+ break;
+ }
+ }
+
+
+ // if there is any data left that cannot be decoded, we store
+ // it in a buffer in the session and next time this decoder is
+ // invoked the session buffer gets appended to
+ if (buf.hasRemaining()) {
+ storeRemainingInSession(buf, session);
+ } else {
+ if (usingSessionBuffer)
+ removeSessionBuffer(session);
+ }
+ }
+
+ /**
+ * Implement this method to consume the specified cumulative buffer and
+ * decode its content into message(s).
+ *
+ * @param in the cumulative buffer
+ * @return <tt>true</tt> if and only if there's more to decode in the buffer
+ * and you want to have <tt>doDecode</tt> method invoked again.
+ * Return <tt>false</tt> if remaining data is not enough to decode,
+ * then this method will be invoked again when more data is cumulated.
+ * @throws Exception if cannot decode <tt>in</tt>.
+ */
+ protected abstract boolean doDecode(IoSession session, ByteBuffer in,
+ ProtocolDecoderOutput out) throws Exception;
+
+ /**
+ * Releases the cumulative buffer used by the specified <tt>session</tt>.
+ * Please don't forget to call <tt>super.dispose( session )</tt> when
+ * you override this method.
+ */
+ public void dispose(IoSession session) throws Exception {
+ removeSessionBuffer(session);
+ }
+
+ private void removeSessionBuffer(IoSession session) {
+ ByteBuffer buf = (ByteBuffer) session.removeAttribute(BUFFER);
+ if (buf != null) {
+ buf.release();
+ }
+ }
+
+ private void storeRemainingInSession(ByteBuffer buf, IoSession session) {
+ ByteBuffer remainingBuf = ByteBuffer.allocate(buf.capacity());
+ remainingBuf.setAutoExpand(true);
+ remainingBuf.order(buf.order());
+ remainingBuf.put(buf);
+ session.setAttribute(BUFFER, remainingBuf);
+ }
+}
Propchange: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java?rev=598285&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java (added)
+++ incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java Mon Nov 26 06:16:01 2007
@@ -0,0 +1,547 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.common.support.BaseIoAcceptor;
+import org.apache.mina.util.Queue;
+import org.apache.mina.util.NewThreadExecutor;
+import org.apache.mina.util.NamePreservingRunnable;
+import edu.emory.mathcs.backport.java.util.concurrent.Executor;
+
+/**
+ * {@link IoAcceptor} for socket transport (TCP/IP).
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class MultiThreadSocketAcceptor extends SocketAcceptor
+{
+ /**
+ * @noinspection StaticNonFinalField
+ */
+ private static volatile int nextId = 0;
+
+ private final Executor executor;
+ private final Object lock = new Object();
+ private final int id = nextId ++;
+ private final String threadName = "SocketAcceptor-" + id;
+ private final Map channels = new HashMap();
+
+ private final Queue registerQueue = new Queue();
+ private final Queue cancelQueue = new Queue();
+
+ private final MultiThreadSocketIoProcessor[] ioProcessors;
+ private final int processorCount;
+
+ /**
+ * @noinspection FieldAccessedSynchronizedAndUnsynchronized
+ */
+ private Selector selector;
+ private Worker worker;
+ private int processorDistributor = 0;
+
+ /**
+ * Create an acceptor with a single processing thread using a NewThreadExecutor
+ */
+ public MultiThreadSocketAcceptor()
+ {
+ this( 1, new NewThreadExecutor() );
+ }
+
+ /**
+ * Create an acceptor with the desired number of processing threads
+ *
+ * @param processorCount Number of processing threads
+ * @param executor Executor to use for launching threads
+ */
+ public MultiThreadSocketAcceptor( int processorCount, Executor executor )
+ {
+ if( processorCount < 1 )
+ {
+ throw new IllegalArgumentException( "Must have at least one processor" );
+ }
+
+ this.executor = executor;
+ this.processorCount = processorCount;
+ ioProcessors = new MultiThreadSocketIoProcessor[processorCount];
+
+ for( int i = 0; i < processorCount; i++ )
+ {
+ ioProcessors[i] = new MultiThreadSocketIoProcessor( "SocketAcceptorIoProcessor-" + id + "." + i, executor );
+ }
+ }
+
+
+ /**
+ * Binds to the specified <code>address</code> and handles incoming connections with the specified
+ * <code>handler</code>. Backlog value is configured to the value of <code>backlog</code> property.
+ *
+ * @throws IOException if failed to bind
+ */
+ public void bind( SocketAddress address, IoHandler handler, IoServiceConfig config ) throws IOException
+ {
+ if( handler == null )
+ {
+ throw new NullPointerException( "handler" );
+ }
+
+ if( address != null && !( address instanceof InetSocketAddress ) )
+ {
+ throw new IllegalArgumentException( "Unexpected address type: " + address.getClass() );
+ }
+
+ if( config == null )
+ {
+ config = getDefaultConfig();
+ }
+
+ RegistrationRequest request = new RegistrationRequest( address, handler, config );
+
+ synchronized( registerQueue )
+ {
+ registerQueue.push( request );
+ }
+
+ startupWorker();
+
+ selector.wakeup();
+
+ synchronized( request )
+ {
+ while( !request.done )
+ {
+ try
+ {
+ request.wait();
+ }
+ catch( InterruptedException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+ }
+ }
+ }
+
+ if( request.exception != null )
+ {
+ throw request.exception;
+ }
+ }
+
+
+ private synchronized void startupWorker() throws IOException
+ {
+ synchronized( lock )
+ {
+ if( worker == null )
+ {
+ selector = Selector.open();
+ worker = new Worker();
+
+ executor.execute( new NamePreservingRunnable( worker ) );
+ }
+ }
+ }
+
+ public void unbind( SocketAddress address )
+ {
+ if( address == null )
+ {
+ throw new NullPointerException( "address" );
+ }
+
+ CancellationRequest request = new CancellationRequest( address );
+
+ try
+ {
+ startupWorker();
+ }
+ catch( IOException e )
+ {
+ // IOException is thrown only when Worker thread is not
+ // running and failed to open a selector. We simply throw
+ // IllegalArgumentException here because we can simply
+ // conclude that nothing is bound to the selector.
+ throw new IllegalArgumentException( "Address not bound: " + address );
+ }
+
+ synchronized( cancelQueue )
+ {
+ cancelQueue.push( request );
+ }
+
+ selector.wakeup();
+
+ synchronized( request )
+ {
+ while( !request.done )
+ {
+ try
+ {
+ request.wait();
+ }
+ catch( InterruptedException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+ }
+ }
+ }
+
+ if( request.exception != null )
+ {
+ request.exception.fillInStackTrace();
+
+ throw request.exception;
+ }
+ }
+
+
+ private class Worker implements Runnable
+ {
+ public void run()
+ {
+ Thread.currentThread().setName(MultiThreadSocketAcceptor.this.threadName );
+
+ for( ; ; )
+ {
+ try
+ {
+ int nKeys = selector.select();
+
+ registerNew();
+
+ if( nKeys > 0 )
+ {
+ processSessions( selector.selectedKeys() );
+ }
+
+ cancelKeys();
+
+ if( selector.keys().isEmpty() )
+ {
+ synchronized( lock )
+ {
+ if( selector.keys().isEmpty() &&
+ registerQueue.isEmpty() &&
+ cancelQueue.isEmpty() )
+ {
+ worker = null;
+ try
+ {
+ selector.close();
+ }
+ catch( IOException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+ }
+ finally
+ {
+ selector = null;
+ }
+ break;
+ }
+ }
+ }
+ }
+ catch( IOException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+
+ try
+ {
+ Thread.sleep( 1000 );
+ }
+ catch( InterruptedException e1 )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e1 );
+ }
+ }
+ }
+ }
+
+ private void processSessions( Set keys ) throws IOException
+ {
+ Iterator it = keys.iterator();
+ while( it.hasNext() )
+ {
+ SelectionKey key = ( SelectionKey ) it.next();
+
+ it.remove();
+
+ if( !key.isAcceptable() )
+ {
+ continue;
+ }
+
+ ServerSocketChannel ssc = ( ServerSocketChannel ) key.channel();
+
+ SocketChannel ch = ssc.accept();
+
+ if( ch == null )
+ {
+ continue;
+ }
+
+ boolean success = false;
+ try
+ {
+
+ RegistrationRequest req = ( RegistrationRequest ) key.attachment();
+
+ MultiThreadSocketSessionImpl session = new MultiThreadSocketSessionImpl(
+ MultiThreadSocketAcceptor.this, nextProcessor(), getListeners(),
+ req.config, ch, req.handler, req.address );
+
+ // New Interface
+// SocketSessionImpl session = new SocketSessionImpl(
+// SocketAcceptor.this, nextProcessor(), getListeners(),
+// req.config, ch, req.handler, req.address );
+
+
+ getFilterChainBuilder().buildFilterChain( session.getFilterChain() );
+ req.config.getFilterChainBuilder().buildFilterChain( session.getFilterChain() );
+ req.config.getThreadModel().buildFilterChain( session.getFilterChain() );
+ session.getIoProcessor().addNew( session );
+ success = true;
+ }
+ catch( Throwable t )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( t );
+ }
+ finally
+ {
+ if( !success )
+ {
+ ch.close();
+ }
+ }
+ }
+ }
+ }
+
+ private MultiThreadSocketIoProcessor nextProcessor()
+ {
+ return ioProcessors[processorDistributor++ % processorCount];
+ }
+
+
+ private void registerNew()
+ {
+ if( registerQueue.isEmpty() )
+ {
+ return;
+ }
+
+ for( ; ; )
+ {
+ RegistrationRequest req;
+
+ synchronized( registerQueue )
+ {
+ req = ( RegistrationRequest ) registerQueue.pop();
+ }
+
+ if( req == null )
+ {
+ break;
+ }
+
+ ServerSocketChannel ssc = null;
+
+ try
+ {
+ ssc = ServerSocketChannel.open();
+ ssc.configureBlocking( false );
+
+ // Configure the server socket,
+ SocketAcceptorConfig cfg;
+ if( req.config instanceof SocketAcceptorConfig )
+ {
+ cfg = ( SocketAcceptorConfig ) req.config;
+ }
+ else
+ {
+ cfg = ( SocketAcceptorConfig ) getDefaultConfig();
+ }
+
+ ssc.socket().setReuseAddress( cfg.isReuseAddress() );
+ ssc.socket().setReceiveBufferSize(
+ ( ( SocketSessionConfig ) cfg.getSessionConfig() ).getReceiveBufferSize() );
+
+ // and bind.
+ ssc.socket().bind( req.address, cfg.getBacklog() );
+ if( req.address == null || req.address.getPort() == 0 )
+ {
+ req.address = ( InetSocketAddress ) ssc.socket().getLocalSocketAddress();
+ }
+ ssc.register( selector, SelectionKey.OP_ACCEPT, req );
+
+ synchronized( channels )
+ {
+ channels.put( req.address, ssc );
+ }
+
+ getListeners().fireServiceActivated(
+ this, req.address, req.handler, req.config );
+ }
+ catch( IOException e )
+ {
+ req.exception = e;
+ }
+ finally
+ {
+ synchronized( req )
+ {
+ req.done = true;
+
+ req.notifyAll();
+ }
+
+ if( ssc != null && req.exception != null )
+ {
+ try
+ {
+ ssc.close();
+ }
+ catch( IOException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+ }
+ }
+ }
+ }
+ }
+
+
+ private void cancelKeys()
+ {
+ if( cancelQueue.isEmpty() )
+ {
+ return;
+ }
+
+ for( ; ; )
+ {
+ CancellationRequest request;
+
+ synchronized( cancelQueue )
+ {
+ request = ( CancellationRequest ) cancelQueue.pop();
+ }
+
+ if( request == null )
+ {
+ break;
+ }
+
+ ServerSocketChannel ssc;
+ synchronized( channels )
+ {
+ ssc = ( ServerSocketChannel ) channels.remove( request.address );
+ }
+
+ // close the channel
+ try
+ {
+ if( ssc == null )
+ {
+ request.exception = new IllegalArgumentException( "Address not bound: " + request.address );
+ }
+ else
+ {
+ SelectionKey key = ssc.keyFor( selector );
+ request.registrationRequest = ( RegistrationRequest ) key.attachment();
+ key.cancel();
+
+ selector.wakeup(); // wake up again to trigger thread death
+
+ ssc.close();
+ }
+ }
+ catch( IOException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+ }
+ finally
+ {
+ synchronized( request )
+ {
+ request.done = true;
+ request.notifyAll();
+ }
+
+ if( request.exception == null )
+ {
+ getListeners().fireServiceDeactivated(
+ this, request.address,
+ request.registrationRequest.handler,
+ request.registrationRequest.config );
+ }
+ }
+ }
+ }
+
+ private static class RegistrationRequest
+ {
+ private InetSocketAddress address;
+ private final IoHandler handler;
+ private final IoServiceConfig config;
+ private IOException exception;
+ private boolean done;
+
+ private RegistrationRequest( SocketAddress address, IoHandler handler, IoServiceConfig config )
+ {
+ this.address = ( InetSocketAddress ) address;
+ this.handler = handler;
+ this.config = config;
+ }
+ }
+
+
+ private static class CancellationRequest
+ {
+ private final SocketAddress address;
+ private boolean done;
+ private RegistrationRequest registrationRequest;
+ private RuntimeException exception;
+
+ private CancellationRequest( SocketAddress address )
+ {
+ this.address = address;
+ }
+ }
+}
Propchange: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java?rev=598285&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java (added)
+++ incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java Mon Nov 26 06:16:01 2007
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import edu.emory.mathcs.backport.java.util.concurrent.Executor;
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoConnectorConfig;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.common.support.AbstractIoFilterChain;
+import org.apache.mina.common.support.DefaultConnectFuture;
+import org.apache.mina.util.NamePreservingRunnable;
+import org.apache.mina.util.NewThreadExecutor;
+import org.apache.mina.util.Queue;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * {@link IoConnector} for socket transport (TCP/IP).
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class MultiThreadSocketConnector extends SocketConnector
+{
+ /** @noinspection StaticNonFinalField */
+ private static volatile int nextId = 0;
+
+ private final Object lock = new Object();
+ private final int id = nextId++;
+ private final String threadName = "SocketConnector-" + id;
+ private SocketConnectorConfig defaultConfig = new SocketConnectorConfig();
+ private final Queue connectQueue = new Queue();
+ private final MultiThreadSocketIoProcessor[] ioProcessors;
+ private final int processorCount;
+ private final Executor executor;
+
+ /** @noinspection FieldAccessedSynchronizedAndUnsynchronized */
+ private Selector selector;
+ private Worker worker;
+ private int processorDistributor = 0;
+ private int workerTimeout = 60; // 1 min.
+
+ /** Create a connector with a single processing thread using a NewThreadExecutor */
+ public MultiThreadSocketConnector()
+ {
+ this(1, new NewThreadExecutor());
+ }
+
+ /**
+ * Create a connector with the desired number of processing threads
+ *
+ * @param processorCount Number of processing threads
+ * @param executor Executor to use for launching threads
+ */
+ public MultiThreadSocketConnector(int processorCount, Executor executor)
+ {
+ if (processorCount < 1)
+ {
+ throw new IllegalArgumentException("Must have at least one processor");
+ }
+
+ this.executor = executor;
+ this.processorCount = processorCount;
+ ioProcessors = new MultiThreadSocketIoProcessor[processorCount];
+
+ for (int i = 0; i < processorCount; i++)
+ {
+ ioProcessors[i] = new MultiThreadSocketIoProcessor("SocketConnectorIoProcessor-" + id + "." + i, executor);
+ }
+ }
+
+ /**
+ * How many seconds to keep the connection thread alive between connection requests
+ *
+ * @return Number of seconds to keep connection thread alive
+ */
+ public int getWorkerTimeout()
+ {
+ return workerTimeout;
+ }
+
+ /**
+ * Set how many seconds the connection worker thread should remain alive once idle before terminating itself.
+ *
+ * @param workerTimeout Number of seconds to keep thread alive. Must be >=0
+ */
+ public void setWorkerTimeout(int workerTimeout)
+ {
+ if (workerTimeout < 0)
+ {
+ throw new IllegalArgumentException("Must be >= 0");
+ }
+ this.workerTimeout = workerTimeout;
+ }
+
+ public ConnectFuture connect(SocketAddress address, IoHandler handler, IoServiceConfig config)
+ {
+ return connect(address, null, handler, config);
+ }
+
+ public ConnectFuture connect(SocketAddress address, SocketAddress localAddress,
+ IoHandler handler, IoServiceConfig config)
+ {
+ if (address == null)
+ {
+ throw new NullPointerException("address");
+ }
+ if (handler == null)
+ {
+ throw new NullPointerException("handler");
+ }
+
+ if (!(address instanceof InetSocketAddress))
+ {
+ throw new IllegalArgumentException("Unexpected address type: "
+ + address.getClass());
+ }
+
+ if (localAddress != null && !(localAddress instanceof InetSocketAddress))
+ {
+ throw new IllegalArgumentException("Unexpected local address type: "
+ + localAddress.getClass());
+ }
+
+ if (config == null)
+ {
+ config = getDefaultConfig();
+ }
+
+ SocketChannel ch = null;
+ boolean success = false;
+ try
+ {
+ ch = SocketChannel.open();
+ ch.socket().setReuseAddress(true);
+ if (localAddress != null)
+ {
+ ch.socket().bind(localAddress);
+ }
+
+ ch.configureBlocking(false);
+
+ if (ch.connect(address))
+ {
+ DefaultConnectFuture future = new DefaultConnectFuture();
+ newSession(ch, handler, config, future);
+ success = true;
+ return future;
+ }
+
+ success = true;
+ }
+ catch (IOException e)
+ {
+ return DefaultConnectFuture.newFailedFuture(e);
+ }
+ finally
+ {
+ if (!success && ch != null)
+ {
+ try
+ {
+ ch.close();
+ }
+ catch (IOException e)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ }
+ }
+
+ ConnectionRequest request = new ConnectionRequest(ch, handler, config);
+ synchronized (lock)
+ {
+ try
+ {
+ startupWorker();
+ }
+ catch (IOException e)
+ {
+ try
+ {
+ ch.close();
+ }
+ catch (IOException e2)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e2);
+ }
+
+ return DefaultConnectFuture.newFailedFuture(e);
+ }
+ }
+
+ synchronized (connectQueue)
+ {
+ connectQueue.push(request);
+ }
+ selector.wakeup();
+
+ return request;
+ }
+
+ private synchronized void startupWorker() throws IOException
+ {
+ if (worker == null)
+ {
+ selector = Selector.open();
+ worker = new Worker();
+ executor.execute(new NamePreservingRunnable(worker));
+ }
+ }
+
+ private void registerNew()
+ {
+ if (connectQueue.isEmpty())
+ {
+ return;
+ }
+
+ for (; ;)
+ {
+ ConnectionRequest req;
+ synchronized (connectQueue)
+ {
+ req = (ConnectionRequest) connectQueue.pop();
+ }
+
+ if (req == null)
+ {
+ break;
+ }
+
+ SocketChannel ch = req.channel;
+ try
+ {
+ ch.register(selector, SelectionKey.OP_CONNECT, req);
+ }
+ catch (IOException e)
+ {
+ req.setException(e);
+ }
+ }
+ }
+
+ private void processSessions(Set keys)
+ {
+ Iterator it = keys.iterator();
+
+ while (it.hasNext())
+ {
+ SelectionKey key = (SelectionKey) it.next();
+
+ if (!key.isConnectable())
+ {
+ continue;
+ }
+
+ SocketChannel ch = (SocketChannel) key.channel();
+ ConnectionRequest entry = (ConnectionRequest) key.attachment();
+
+ boolean success = false;
+ try
+ {
+ ch.finishConnect();
+ newSession(ch, entry.handler, entry.config, entry);
+ success = true;
+ }
+ catch (Throwable e)
+ {
+ entry.setException(e);
+ }
+ finally
+ {
+ key.cancel();
+ if (!success)
+ {
+ try
+ {
+ ch.close();
+ }
+ catch (IOException e)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ }
+ }
+ }
+
+ keys.clear();
+ }
+
+ private void processTimedOutSessions(Set keys)
+ {
+ long currentTime = System.currentTimeMillis();
+ Iterator it = keys.iterator();
+
+ while (it.hasNext())
+ {
+ SelectionKey key = (SelectionKey) it.next();
+
+ if (!key.isValid())
+ {
+ continue;
+ }
+
+ ConnectionRequest entry = (ConnectionRequest) key.attachment();
+
+ if (currentTime >= entry.deadline)
+ {
+ entry.setException(new ConnectException());
+ try
+ {
+ key.channel().close();
+ }
+ catch (IOException e)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ finally
+ {
+ key.cancel();
+ }
+ }
+ }
+ }
+
+ private void newSession(SocketChannel ch, IoHandler handler, IoServiceConfig config, ConnectFuture connectFuture)
+ throws IOException
+ {
+ MultiThreadSocketSessionImpl session =
+ new MultiThreadSocketSessionImpl(this, nextProcessor(), getListeners(),
+ config, ch, handler, ch.socket().getRemoteSocketAddress());
+
+ //new interface
+// SocketSessionImpl session = new SocketSessionImpl(
+// this, nextProcessor(), getListeners(),
+// config, ch, handler, ch.socket().getRemoteSocketAddress() );
+ try
+ {
+ getFilterChainBuilder().buildFilterChain(session.getFilterChain());
+ config.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
+ config.getThreadModel().buildFilterChain(session.getFilterChain());
+ }
+ catch (Throwable e)
+ {
+ throw (IOException) new IOException("Failed to create a session.").initCause(e);
+ }
+
+ // Set the ConnectFuture of the specified session, which will be
+ // removed and notified by AbstractIoFilterChain eventually.
+// session.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, connectFuture );
+ session.setAttribute(AbstractIoFilterChain.class.getName() + ".connectFuture", connectFuture);
+
+ // Forward the remaining process to the SocketIoProcessor.
+ session.getIoProcessor().addNew(session);
+ }
+
+ private MultiThreadSocketIoProcessor nextProcessor()
+ {
+ return ioProcessors[processorDistributor++ % processorCount];
+ }
+
+ private class Worker implements Runnable
+ {
+ private long lastActive = System.currentTimeMillis();
+
+ public void run()
+ {
+ Thread.currentThread().setName(MultiThreadSocketConnector.this.threadName);
+
+ for (; ;)
+ {
+ try
+ {
+ int nKeys = selector.select(1000);
+
+ registerNew();
+
+ if (nKeys > 0)
+ {
+ processSessions(selector.selectedKeys());
+ }
+
+ processTimedOutSessions(selector.keys());
+
+ if (selector.keys().isEmpty())
+ {
+ if (System.currentTimeMillis() - lastActive > workerTimeout * 1000L)
+ {
+ synchronized (lock)
+ {
+ if (selector.keys().isEmpty() &&
+ connectQueue.isEmpty())
+ {
+ worker = null;
+ try
+ {
+ selector.close();
+ }
+ catch (IOException e)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ finally
+ {
+ selector = null;
+ }
+ break;
+ }
+ }
+ }
+ }
+ else
+ {
+ lastActive = System.currentTimeMillis();
+ }
+ }
+ catch (IOException e)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e1)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e1);
+ }
+ }
+ }
+ }
+ }
+
+ private class ConnectionRequest extends DefaultConnectFuture
+ {
+ private final SocketChannel channel;
+ private final long deadline;
+ private final IoHandler handler;
+ private final IoServiceConfig config;
+
+ private ConnectionRequest(SocketChannel channel, IoHandler handler, IoServiceConfig config)
+ {
+ this.channel = channel;
+ long timeout;
+ if (config instanceof IoConnectorConfig)
+ {
+ timeout = ((IoConnectorConfig) config).getConnectTimeoutMillis();
+ }
+ else
+ {
+ timeout = ((IoConnectorConfig) getDefaultConfig()).getConnectTimeoutMillis();
+ }
+ this.deadline = System.currentTimeMillis() + timeout;
+ this.handler = handler;
+ this.config = config;
+ }
+ }
+}
Propchange: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketFilterChain.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketFilterChain.java?rev=598285&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketFilterChain.java (added)
+++ incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketFilterChain.java Mon Nov 26 06:16:01 2007
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.io.IOException;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoFilter.WriteRequest;
+import org.apache.mina.common.support.AbstractIoFilterChain;
+import org.apache.mina.util.Queue;
+
+/**
+ * An {@link IoFilterChain} for socket transport (TCP/IP).
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ */
+class MultiThreadSocketFilterChain extends AbstractIoFilterChain {
+
+ MultiThreadSocketFilterChain( IoSession parent )
+ {
+ super( parent );
+ }
+
+ protected void doWrite( IoSession session, WriteRequest writeRequest )
+ {
+ MultiThreadSocketSessionImpl s = (MultiThreadSocketSessionImpl) session;
+ Queue writeRequestQueue = s.getWriteRequestQueue();
+
+ // SocketIoProcessor.doFlush() will reset it after write is finished
+ // because the buffer will be passed with messageSent event.
+ ( ( ByteBuffer ) writeRequest.getMessage() ).mark();
+ synchronized( writeRequestQueue )
+ {
+ writeRequestQueue.push( writeRequest );
+ if( writeRequestQueue.size() == 1 && session.getTrafficMask().isWritable() )
+ {
+ // Notify SocketIoProcessor only when writeRequestQueue was empty.
+ s.getIoProcessor().flush( s );
+ }
+ }
+ }
+
+ protected void doClose( IoSession session ) throws IOException
+ {
+ MultiThreadSocketSessionImpl s = (MultiThreadSocketSessionImpl) session;
+ s.getIoProcessor().remove( s );
+ }
+}
Propchange: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketFilterChain.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketFilterChain.java
------------------------------------------------------------------------------
svn:keywords = Rev Date