You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/11/26 15:16:02 UTC

svn commit: r598285 [1/2] - in /incubator/qpid/branches/M2.1.1/java: broker/ broker/etc/ broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/protocol/ broker/src/main/java/org/apache/qpid/server/transport/ client/sr...

Author: ritchiem
Date: Mon Nov 26 06:16:01 2007
New Revision: 598285

URL: http://svn.apache.org/viewvc?rev=598285&view=rev
Log:
QPID-92, QPID-564 : Upgraded Mina to 1.0.1 still not good enough but all future versions currently have a bug with the CumulativeProtocolDecoder. It compact()s the buffer which breaks slices. Added MultiThread Support which is some of the feature set of QPID-564


Added:
    incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/
    incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/
    incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/WriteBufferFullExeception.java   (with props)
    incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/WriteBufferLimitFilterBuilder.java   (with props)
    incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/codec/
    incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java   (with props)
    incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/
    incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/
    incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/
    incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java   (with props)
    incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java   (with props)
    incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketFilterChain.java   (with props)
    incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java   (with props)
    incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionConfigImpl.java   (with props)
    incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionImpl.java   (with props)
Modified:
    incubator/qpid/branches/M2.1.1/java/broker/etc/config.xml
    incubator/qpid/branches/M2.1.1/java/broker/pom.xml
    incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/Main.java
    incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
    incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java
    incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
    incubator/qpid/branches/M2.1.1/java/common/pom.xml

Modified: incubator/qpid/branches/M2.1.1/java/broker/etc/config.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/broker/etc/config.xml?rev=598285&r1=598284&r2=598285&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.1/java/broker/etc/config.xml (original)
+++ incubator/qpid/branches/M2.1.1/java/broker/etc/config.xml Mon Nov 26 06:16:01 2007
@@ -33,6 +33,7 @@
             <keystorePassword>keystorepass</keystorePassword>
         </ssl>-->
         <qpidnio>true</qpidnio>
+        <protectio>true</protectio>
         <transport>nio</transport>
         <port>5672</port>
         <sslport>8672</sslport>
@@ -172,5 +173,6 @@
 
     <virtualhosts>${conf}/virtualhosts.xml</virtualhosts>
 </broker>
+
 
 

Modified: incubator/qpid/branches/M2.1.1/java/broker/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/broker/pom.xml?rev=598285&r1=598284&r2=598285&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.1/java/broker/pom.xml (original)
+++ incubator/qpid/branches/M2.1.1/java/broker/pom.xml Mon Nov 26 06:16:01 2007
@@ -6,9 +6,9 @@
     to you under the Apache License, Version 2.0 (the
     "License"); you may not use this file except in compliance
     with the License. You may obtain a copy of the License at
-    
+
     http://www.apache.org/licenses/LICENSE-2.0
-    
+
     Unless required by applicable law or agreed to in writing,
     software distributed under the License is distributed on an
     "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -49,16 +49,16 @@
             <artifactId>log4j</artifactId>
         </dependency>
 
-        <dependency>  
+        <dependency>
             <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-api</artifactId>  
-            <version>1.4.0</version> 
+            <artifactId>slf4j-api</artifactId>
+            <version>1.4.0</version>
         </dependency>
 
-        <dependency>  
-            <groupId>org.slf4j</groupId> 
-            <artifactId>slf4j-log4j12</artifactId>  
-            <version>1.4.0</version>  
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <version>1.4.0</version>
         </dependency>
 
         <dependency>
@@ -208,7 +208,7 @@
                                         <isset property="skip.python.tests"/>
                                     </condition>
 
-                                    <property name="command" 
+                                    <property name="command"
                                               value="python run-tests -v -I java_failing.txt -b localhost:2100"/>
                                     <!--value="bash -c 'python run-tests -v -I java_failing.txt'"/>-->
 

Modified: incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/Main.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/Main.java?rev=598285&r1=598284&r2=598285&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/Main.java (original)
+++ incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/Main.java Mon Nov 26 06:16:01 2007
@@ -436,7 +436,7 @@
                 }
             }
 
-            //fixme  qpid.AMQP should be using qpidproperties to get value            
+            //fixme  qpid.AMQP should be using qpidproperties to get value
             _brokerLogger.info("Qpid Broker Ready :" + QpidProperties.getReleaseVersion()
                                + " build: " + QpidProperties.getBuildVersion());
         }

Modified: incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java?rev=598285&r1=598284&r2=598285&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java (original)
+++ incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java Mon Nov 26 06:16:01 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,10 +23,15 @@
 import org.apache.log4j.Logger;
 import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoFilterChain;
 import org.apache.mina.common.IoHandlerAdapter;
 import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.ReadThrottleFilterBuilder;
 import org.apache.mina.filter.SSLFilter;
+import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
 import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.filter.executor.ExecutorFilter;
+import org.apache.mina.transport.socket.nio.SocketSessionConfig;
 import org.apache.mina.util.SessionUtil;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.codec.AMQCodecFactory;
@@ -51,7 +56,6 @@
  *
  * We delegate all frame (message) processing to the AMQProtocolSession which wraps
  * the state for the connection.
- *
  */
 public class AMQPFastProtocolHandler extends IoHandlerAdapter
 {
@@ -115,11 +119,41 @@
             }
 
         }
+
+        if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("broker.connector.protectio", false))
+        {
+            try
+            {
+//        //Add IO Protection Filters
+                IoFilterChain chain = protocolSession.getFilterChain();
+
+                int buf_size = 32768;
+                if (protocolSession.getConfig() instanceof SocketSessionConfig)
+                {
+                    buf_size = ((SocketSessionConfig) protocolSession.getConfig()).getReceiveBufferSize();
+                }
+
+                protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
+
+                ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
+                readfilter.setMaximumConnectionBufferSize(buf_size);
+                readfilter.attach(chain);
+
+                WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
+                writefilter.setMaximumConnectionBufferSize(buf_size * 2);
+                writefilter.attach(chain);
+
+                protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
+                _logger.info("Using IO Read/Write Filter Protection");
+            }
+            catch (Exception e)
+            {
+                _logger.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage());
+            }
+        }
     }
 
-    /**
-     * Separated into its own, protected, method to allow easier reuse
-     */
+    /** Separated into its own, protected, method to allow easier reuse */
     protected void createSession(IoSession session, IApplicationRegistry applicationRegistry, AMQCodecFactory codec) throws AMQException
     {
         new AMQMinaProtocolSession(session, applicationRegistry.getVirtualHostRegistry(), codec);
@@ -193,8 +227,10 @@
     /**
      * Invoked when a message is received on a particular protocol session. Note that a
      * protocol session is directly tied to a particular physical connection.
+     *
      * @param protocolSession the protocol session that received the message
-     * @param message the message itself (i.e. a decoded frame)
+     * @param message         the message itself (i.e. a decoded frame)
+     *
      * @throws Exception if the message cannot be processed
      */
     public void messageReceived(IoSession protocolSession, Object message) throws Exception
@@ -204,7 +240,7 @@
         if (message instanceof AMQDataBlock)
         {
             amqProtocolSession.dataBlockReceived((AMQDataBlock) message);
-                        
+
         }
         else if (message instanceof ByteBuffer)
         {
@@ -218,9 +254,11 @@
 
     /**
      * Called after a message has been sent out on a particular protocol session
+     *
      * @param protocolSession the protocol session (i.e. connection) on which this
-     * message was sent
-     * @param object the message (frame) that was encoded and sent
+     *                        message was sent
+     * @param object          the message (frame) that was encoded and sent
+     *
      * @throws Exception if we want to indicate an error
      */
     public void messageSent(IoSession protocolSession, Object object) throws Exception

Modified: incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java?rev=598285&r1=598284&r2=598285&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java (original)
+++ incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java Mon Nov 26 06:16:01 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,9 +23,12 @@
 import org.apache.mina.common.IoAcceptor;
 import org.apache.mina.util.NewThreadExecutor;
 import org.apache.qpid.configuration.Configured;
+import org.apache.log4j.Logger;
 
 public class ConnectorConfiguration
 {
+    private static final Logger _logger = Logger.getLogger(ConnectorConfiguration.class);
+
     public static final String DEFAULT_PORT = "5672";
 
     public static final String SSL_PORT = "8672";
@@ -41,7 +44,7 @@
     @Configured(path = "connector.bind",
                 defaultValue = "wildcard")
     public String bindAddress;
-    
+
     @Configured(path = "connector.socketReceiveBuffer",
                 defaultValue = "32767")
     public int socketReceiveBufferSize;
@@ -69,29 +72,43 @@
     @Configured(path = "connector.ssl.enabled",
                 defaultValue = "false")
     public boolean enableSSL;
-    
+
     @Configured(path = "connector.ssl.sslOnly",
-    		    defaultValue = "true")
+                defaultValue = "true")
     public boolean sslOnly;
-    
+
     @Configured(path = "connector.ssl.port",
-            defaultValue = SSL_PORT)
-    public int sslPort;    
-    
+                defaultValue = SSL_PORT)
+    public int sslPort;
+
     @Configured(path = "connector.ssl.keystorePath",
-    			defaultValue = "none")
+                defaultValue = "none")
     public String keystorePath;
-    
+
     @Configured(path = "connector.ssl.keystorePassword",
-    			defaultValue = "none")
+                defaultValue = "none")
     public String keystorePassword;
-    
+
     @Configured(path = "connector.ssl.certType",
-    			defaultValue = "SunX509")
+                defaultValue = "SunX509")
     public String certType;
 
+    @Configured(path = "connector.qpidnio",
+                defaultValue = "true")
+    public boolean _multiThreadNIO;
+
+
     public IoAcceptor createAcceptor()
     {
-        return new org.apache.mina.transport.socket.nio.SocketAcceptor(processors, new NewThreadExecutor());     
+        if (_multiThreadNIO)
+        {
+            _logger.warn("Using Qpid Multithreaded IO Processing");
+            return new org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptor(processors, new NewThreadExecutor());
+        }
+        else
+        {
+            _logger.warn("Using Mina IO Processing");
+            return new org.apache.mina.transport.socket.nio.SocketAcceptor(processors, new NewThreadExecutor());
+        }
     }
 }

Modified: incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=598285&r1=598284&r2=598285&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Mon Nov 26 06:16:01 2007
@@ -21,12 +21,16 @@
 package org.apache.qpid.client.protocol;
 
 import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoFilterChain;
 import org.apache.mina.common.IoHandlerAdapter;
 import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.ReadThrottleFilterBuilder;
 import org.apache.mina.filter.SSLFilter;
+import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
 import org.apache.mina.filter.codec.ProtocolCodecException;
 import org.apache.mina.filter.codec.ProtocolCodecFilter;
-
+import org.apache.mina.filter.executor.ExecutorFilter;
+import org.apache.mina.transport.socket.nio.SocketSessionConfig;
 import org.apache.qpid.AMQConnectionClosedException;
 import org.apache.qpid.AMQDisconnectedException;
 import org.apache.qpid.AMQException;
@@ -216,6 +220,36 @@
             e.printStackTrace();
         }
 
+        if (!System.getProperties().containsKey("protectio") || Boolean.getBoolean("protectio"))
+        {
+            try
+            {
+                //Add IO Protection Filters
+                IoFilterChain chain = session.getFilterChain();
+
+                int buf_size = 32768;
+                if (session.getConfig() instanceof SocketSessionConfig)
+                {
+                    buf_size = ((SocketSessionConfig) session.getConfig()).getReceiveBufferSize();
+                }
+                session.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
+
+                ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
+                readfilter.setMaximumConnectionBufferSize(buf_size);
+                readfilter.attach(chain);
+
+                WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
+                writefilter.setMaximumConnectionBufferSize(buf_size * 2);
+                writefilter.attach(chain);
+                session.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
+
+                _logger.info("Using IO Read/Write Filter Protection");
+            }
+            catch (Exception e)
+            {
+                _logger.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage());
+            }
+        }
         _protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager());
         _protocolSession.init();
     }

Modified: incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java?rev=598285&r1=598284&r2=598285&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java (original)
+++ incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java Mon Nov 26 06:16:01 2007
@@ -23,14 +23,13 @@
 import org.apache.mina.common.IoConnector;
 import org.apache.mina.common.IoHandlerAdapter;
 import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.transport.socket.nio.MultiThreadSocketConnector;
 import org.apache.mina.transport.socket.nio.SocketConnector;
 import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
 import org.apache.mina.transport.vmpipe.VmPipeAddress;
-
 import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
 import org.apache.qpid.jms.BrokerDetails;
 import org.apache.qpid.pool.ReadWriteThreadModel;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -90,40 +89,41 @@
         switch (transport)
         {
 
-        case TCP:
-            _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
+            case TCP:
+                _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
+                {
+                    public IoConnector newSocketConnector()
                     {
-                        public IoConnector newSocketConnector()
+                        SocketConnector result;
+                        // FIXME - this needs to be sorted to use the new Mina MultiThread SA.
+                        if (!System.getProperties().containsKey("qpidnio") || Boolean.getBoolean("qpidnio"))
+                        {
+                            _logger.warn("Using Qpid MultiThreaded NIO - " + (System.getProperties().containsKey("qpidnio")
+                                                                 ? "Qpid NIO is new default"
+                                                                 : "Sysproperty 'qpidnio' is set"));
+                            result = new MultiThreadSocketConnector();
+                        }
+                        else
                         {
-                            SocketConnector result;
-                            // FIXME - this needs to be sorted to use the new Mina MultiThread SA.
-                            if (Boolean.getBoolean("qpidnio"))
-                            {
-                                _logger.error("Using Qpid NIO - sysproperty 'qpidnio' is set.");
-                                // result = new org.apache.qpid.nio.SocketConnector(); // non-blocking connector
-                            }
-                            // else
-
-                            {
-                                _logger.info("Using Mina NIO");
-                                result = new SocketConnector(); // non-blocking connector
-                            }
-
-                            // Don't have the connector's worker thread wait around for other connections (we only use
-                            // one SocketConnector per connection at the moment anyway). This allows short-running
-                            // clients (like unit tests) to complete quickly.
-                            result.setWorkerTimeout(0);
-
-                            return result;
+                            _logger.info("Using Mina NIO");
+                            result = new SocketConnector(); // non-blocking connector
                         }
-                    });
-            break;
 
-        case VM:
-        {
-            _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
-            break;
-        }
+                        // Don't have the connector's worker thread wait around for other connections (we only use
+                        // one SocketConnector per connection at the moment anyway). This allows short-running
+                        // clients (like unit tests) to complete quickly.
+                        result.setWorkerTimeout(0);
+
+                        return result;
+                    }
+                });
+                break;
+
+            case VM:
+            {
+                _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
+                break;
+            }
         }
 
         return _instance;
@@ -145,7 +145,7 @@
     }
 
     private static ITransportConnection getVMTransport(BrokerDetails details, boolean AutoCreate)
-        throws AMQVMBrokerCreationException
+            throws AMQVMBrokerCreationException
     {
         int port = details.getPort();
 
@@ -160,7 +160,7 @@
                 else
                 {
                     throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port
-                        + " does not exist. Auto create disabled.", null);
+                                                                       + " does not exist. Auto create disabled.", null);
                 }
             }
         }
@@ -257,8 +257,8 @@
         IoHandlerAdapter provider;
         try
         {
-            Class[] cnstr = { Integer.class };
-            Object[] params = { port };
+            Class[] cnstr = {Integer.class};
+            Object[] params = {port};
             provider = (IoHandlerAdapter) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params);
             // Give the broker a second to create
             _logger.info("Created VMBroker Instance:" + port);
@@ -277,7 +277,7 @@
             }
 
             AMQVMBrokerCreationException amqbce =
-                new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", null);
+                    new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", null);
             amqbce.initCause(e);
             throw amqbce;
         }

Modified: incubator/qpid/branches/M2.1.1/java/common/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/common/pom.xml?rev=598285&r1=598284&r2=598285&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.1/java/common/pom.xml (original)
+++ incubator/qpid/branches/M2.1.1/java/common/pom.xml Mon Nov 26 06:16:01 2007
@@ -6,9 +6,9 @@
     to you under the Apache License, Version 2.0 (the
     "License"); you may not use this file except in compliance
     with the License. You may obtain a copy of the License at
-    
+
     http://www.apache.org/licenses/LICENSE-2.0
-    
+
     Unless required by applicable law or agreed to in writing,
     software distributed under the License is distributed on an
     "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -45,7 +45,7 @@
     </properties>
 
     <build>
-        <plugins>          
+        <plugins>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-antrun-plugin</artifactId>
@@ -114,14 +114,14 @@
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
-            <version>1.4.0</version> 
+            <version>1.4.0</version>
         </dependency>
 
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-log4j12</artifactId>
             <version>1.4.0</version>
-            <scope>test</scope> 
+            <scope>test</scope>
         </dependency>
 
         <dependency>

Added: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/WriteBufferFullExeception.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/WriteBufferFullExeception.java?rev=598285&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/WriteBufferFullExeception.java (added)
+++ incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/WriteBufferFullExeception.java Mon Nov 26 06:16:01 2007
@@ -0,0 +1,48 @@
+package org.apache.mina.filter;
+
+import org.apache.mina.common.IoFilter;/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+public class WriteBufferFullExeception extends RuntimeException
+{
+    private IoFilter.WriteRequest _writeRequest;
+
+    public WriteBufferFullExeception()
+    {
+        this(null);
+    }
+
+    public WriteBufferFullExeception(IoFilter.WriteRequest writeRequest)
+    {
+        _writeRequest = writeRequest;
+    }
+
+
+    public void setWriteRequest(IoFilter.WriteRequest writeRequest)
+    {
+        _writeRequest = writeRequest;
+    }
+
+    public IoFilter.WriteRequest getWriteRequest()
+    {
+        return _writeRequest;
+    }
+}

Propchange: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/WriteBufferFullExeception.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/WriteBufferFullExeception.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/WriteBufferLimitFilterBuilder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/WriteBufferLimitFilterBuilder.java?rev=598285&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/WriteBufferLimitFilterBuilder.java (added)
+++ incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/WriteBufferLimitFilterBuilder.java Mon Nov 26 06:16:01 2007
@@ -0,0 +1,272 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.filter;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.DefaultIoFilterChainBuilder;
+import org.apache.mina.common.IoFilterAdapter;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.executor.ExecutorFilter;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * This filter will turn the asynchronous filterWrite method in to a blocking send when there are more than
+ * the prescribed number of messages awaiting filterWrite. It should be used in conjunction with the
+ * {@link ReadThrottleFilterBuilder} on a server as the blocking writes will allow the read thread to
+ * cause an Out of Memory exception due to a back log of unprocessed messages.
+ *
+ * This is should only be viewed as a temporary work around for DIRMINA-302.
+ *
+ * A true solution should not be implemented as a filter as this issue will always occur. On a machine
+ * where the network is slower than the local producer.
+ *
+ * Suggested improvement is to allow implementation of policices on what to do when buffer is full.
+ *
+ * They could be:
+ * Block - As this does
+ * Wait on a given Future - to drain more of the queue.. in essence this filter with high/low watermarks
+ * Throw Exception - through the client filterWrite() method to allow them to get immediate feedback on buffer state
+ *
+ * <p/>
+ * <p>Usage:
+ * <p/>
+ * <pre><code>
+ * DefaultFilterChainBuilder builder = ...
+ * WriteBufferLimitFilterBuilder filter = new WriteBufferLimitFilterBuilder();
+ * filter.attach( builder );
+ * </code></pre>
+ * <p/>
+ * or
+ * <p/>
+ * <pre><code>
+ * IoFilterChain chain = ...
+ * WriteBufferLimitFilterBuilder filter = new WriteBufferLimitFilterBuilder();
+ * filter.attach( chain );
+ * </code></pre>
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class WriteBufferLimitFilterBuilder
+{
+    public static final String PENDING_SIZE = WriteBufferLimitFilterBuilder.class.getName() + ".pendingSize";
+
+    private static int DEFAULT_CONNECTION_BUFFER_MESSAGE_COUNT = 5000;
+
+    private volatile boolean throwNotBlock = false;
+
+    private volatile int maximumConnectionBufferCount;
+    private volatile long maximumConnectionBufferSize;
+
+    private final Object _blockLock = new Object();
+
+    private int _blockWaiters = 0;
+
+
+    public WriteBufferLimitFilterBuilder()
+    {
+        this(DEFAULT_CONNECTION_BUFFER_MESSAGE_COUNT);
+    }
+
+    public WriteBufferLimitFilterBuilder(int maxWriteBufferSize)
+    {
+        setMaximumConnectionBufferCount(maxWriteBufferSize);
+    }
+
+
+    /**
+     * Set the maximum amount pending items in the writeQueue for a given session.
+     * Changing the value will only take effect when new data is received for a
+     * connection, including existing connections. Default value is 5000 msgs.
+     *
+     * @param maximumConnectionBufferCount New buffer size. Must be > 0
+     */
+    public void setMaximumConnectionBufferCount(int maximumConnectionBufferCount)
+    {
+        this.maximumConnectionBufferCount = maximumConnectionBufferCount;
+        this.maximumConnectionBufferSize = 0;
+    }
+
+    public void setMaximumConnectionBufferSize(long maximumConnectionBufferSize)
+    {
+        this.maximumConnectionBufferSize = maximumConnectionBufferSize;
+        this.maximumConnectionBufferCount = 0;
+    }
+
+    /**
+     * Attach this filter to the specified filter chain. It will search for the ThreadPoolFilter, and attach itself
+     * before and after that filter.
+     *
+     * @param chain {@link IoFilterChain} to attach self to.
+     */
+    public void attach(IoFilterChain chain)
+    {
+        String name = getThreadPoolFilterEntryName(chain.getAll());
+
+        chain.addBefore(name, getClass().getName() + ".sendlimit", new SendLimit());
+    }
+
+    /**
+     * Attach this filter to the specified builder. It will search for the
+     * {@link ExecutorFilter}, and attach itself before and after that filter.
+     *
+     * @param builder {@link DefaultIoFilterChainBuilder} to attach self to.
+     */
+    public void attach(DefaultIoFilterChainBuilder builder)
+    {
+        String name = getThreadPoolFilterEntryName(builder.getAll());
+
+        builder.addBefore(name, getClass().getName() + ".sendlimit", new SendLimit());
+    }
+
+    private String getThreadPoolFilterEntryName(List entries)
+    {
+        Iterator i = entries.iterator();
+
+        while (i.hasNext())
+        {
+            IoFilterChain.Entry entry = (IoFilterChain.Entry) i.next();
+
+            if (entry.getFilter().getClass().isAssignableFrom(ExecutorFilter.class))
+            {
+                return entry.getName();
+            }
+        }
+
+        throw new IllegalStateException("Chain does not contain a ExecutorFilter");
+    }
+
+
+    public class SendLimit extends IoFilterAdapter
+    {
+        public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception
+        {
+            try
+            {
+                waitTillSendAllowed(session);
+            }
+            catch (WriteBufferFullExeception wbfe)
+            {
+                nextFilter.exceptionCaught(session, wbfe);
+            }
+
+            if (writeRequest.getMessage() instanceof ByteBuffer)
+            {
+                increasePendingWriteSize(session, (ByteBuffer) writeRequest.getMessage());
+            }
+
+            nextFilter.filterWrite(session, writeRequest);
+        }
+
+        private void increasePendingWriteSize(IoSession session, ByteBuffer message)
+        {
+            synchronized (session)
+            {
+                Long pendingSize = getScheduledWriteBytes(session) + message.remaining();
+                session.setAttribute(PENDING_SIZE, pendingSize);
+            }
+        }
+
+        private boolean sendAllowed(IoSession session)
+        {
+            if (session.isClosing())
+            {
+                return true;
+            }
+
+            int lmswm = maximumConnectionBufferCount;
+            long lmswb = maximumConnectionBufferSize;
+
+            return (lmswm == 0 || session.getScheduledWriteRequests() < lmswm)
+                   && (lmswb == 0 || getScheduledWriteBytes(session) < lmswb);
+        }
+
+        private long getScheduledWriteBytes(IoSession session)
+        {
+            synchronized (session)
+            {
+                Long i = (Long) session.getAttribute(PENDING_SIZE);
+                return null == i ? 0 : i;
+            }
+        }
+
+        private void waitTillSendAllowed(IoSession session)
+        {
+            synchronized (_blockLock)
+            {
+                if (throwNotBlock)
+                {
+                    throw new WriteBufferFullExeception();
+                }
+
+                _blockWaiters++;
+
+                while (!sendAllowed(session))
+                {
+                    try
+                    {
+                        _blockLock.wait();
+                    }
+                    catch (InterruptedException e)
+                    {
+                        // Ignore.
+                    }
+                }
+                _blockWaiters--;
+            }
+        }
+
+        public void messageSent(NextFilter nextFilter, IoSession session, Object message) throws Exception
+        {
+            if (message instanceof ByteBuffer)
+            {
+                decrementPendingWriteSize(session, (ByteBuffer) message);
+            }
+            notifyWaitingWriters();
+            nextFilter.messageSent(session, message);
+        }
+
+        private void decrementPendingWriteSize(IoSession session, ByteBuffer message)
+        {
+            synchronized (session)
+            {
+                session.setAttribute(PENDING_SIZE, getScheduledWriteBytes(session) - message.remaining());
+            }
+        }
+
+        private void notifyWaitingWriters()
+        {
+            synchronized (_blockLock)
+            {
+                if (_blockWaiters != 0)
+                {
+                    _blockLock.notifyAll();
+                }
+            }
+
+        }
+
+    }//SentLimit
+
+
+}

Propchange: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/WriteBufferLimitFilterBuilder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/WriteBufferLimitFilterBuilder.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java?rev=598285&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java (added)
+++ incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java Mon Nov 26 06:16:01 2007
@@ -0,0 +1,197 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.filter.codec;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
+
+/**
+ * A {@link ProtocolDecoder} that cumulates the content of received
+ * buffers to a <em>cumulative buffer</em> to help users implement decoders.
+ * <p>
+ * If the received {@link ByteBuffer} is only a part of a message.
+ * decoders should cumulate received buffers to make a message complete or
+ * to postpone decoding until more buffers arrive.
+ * <p>
+ * Here is an example decoder that decodes CRLF terminated lines into
+ * <code>Command</code> objects:
+ * <pre>
+ * public class CRLFTerminatedCommandLineDecoder
+ *         extends CumulativeProtocolDecoder {
+ *
+ *     private Command parseCommand(ByteBuffer in) {
+ *         // Convert the bytes in the specified buffer to a
+ *         // Command object.
+ *         ...
+ *     }
+ *
+ *     protected boolean doDecode(IoSession session, ByteBuffer in,
+ *                                ProtocolDecoderOutput out)
+ *             throws Exception {
+ *
+ *         // Remember the initial position.
+ *         int start = in.position();
+ *
+ *         // Now find the first CRLF in the buffer.
+ *         byte previous = 0;
+ *         while (in.hasRemaining()) {
+ *             byte current = in.get();
+ *
+ *             if (previous == '\r' && current == '\n') {
+ *                 // Remember the current position and limit.
+ *                 int position = in.position();
+ *                 int limit = in.limit();
+ *                 try {
+ *                     in.position(start);
+ *                     in.limit(position);
+ *                     // The bytes between in.position() and in.limit()
+ *                     // now contain a full CRLF terminated line.
+ *                     out.write(parseCommand(in.slice()));
+ *                 } finally {
+ *                     // Set the position to point right after the
+ *                     // detected line and set the limit to the old
+ *                     // one.
+ *                     in.position(position);
+ *                     in.limit(limit);
+ *                 }
+ *                 // Decoded one line; CumulativeProtocolDecoder will
+ *                 // call me again until I return false. So just
+ *                 // return true until there are no more lines in the
+ *                 // buffer.
+ *                 return true;
+ *             }
+ *
+ *             previous = current;
+ *         }
+ *
+ *         // Could not find CRLF in the buffer. Reset the initial
+ *         // position to the one we recorded above.
+ *         in.position(start);
+ *
+ *         return false;
+ *     }
+ * }
+ * </pre>
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public abstract class OurCumulativeProtocolDecoder extends ProtocolDecoderAdapter {
+
+    private static final String BUFFER = OurCumulativeProtocolDecoder.class
+            .getName()
+            + ".Buffer";
+
+    /**
+     * Creates a new instance.
+     */
+    protected OurCumulativeProtocolDecoder() {
+    }
+
+    /**
+     * Cumulates content of <tt>in</tt> into internal buffer and forwards
+     * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
+     * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
+     * and the cumulative buffer is NOT compacted after decoding ends.
+     *
+     * @throws IllegalStateException if your <tt>doDecode()</tt> returned
+     *                               <tt>true</tt> not consuming the cumulative buffer.
+     */
+    public void decode(IoSession session, ByteBuffer in,
+            ProtocolDecoderOutput out) throws Exception {
+        boolean usingSessionBuffer = true;
+        ByteBuffer buf = (ByteBuffer) session.getAttribute(BUFFER);
+        // If we have a session buffer, append data to that; otherwise
+        // use the buffer read from the network directly.
+        if (buf != null) {
+            buf.put(in);
+            buf.flip();
+        } else {
+            buf = in;
+            usingSessionBuffer = false;
+        }
+
+        for (;;) {
+            int oldPos = buf.position();
+            boolean decoded = doDecode(session, buf, out);
+            if (decoded) {
+                if (buf.position() == oldPos) {
+                    throw new IllegalStateException(
+                            "doDecode() can't return true when buffer is not consumed.");
+                }
+
+                if (!buf.hasRemaining()) {
+                    break;
+                }
+            } else {
+                break;
+            }
+        }
+
+
+        // if there is any data left that cannot be decoded, we store
+        // it in a buffer in the session and next time this decoder is
+        // invoked the session buffer gets appended to
+        if (buf.hasRemaining()) {
+            storeRemainingInSession(buf, session);
+        } else {
+            if (usingSessionBuffer)
+                removeSessionBuffer(session);
+        }
+    }
+
+    /**
+     * Implement this method to consume the specified cumulative buffer and
+     * decode its content into message(s).
+     *
+     * @param in the cumulative buffer
+     * @return <tt>true</tt> if and only if there's more to decode in the buffer
+     *         and you want to have <tt>doDecode</tt> method invoked again.
+     *         Return <tt>false</tt> if remaining data is not enough to decode,
+     *         then this method will be invoked again when more data is cumulated.
+     * @throws Exception if cannot decode <tt>in</tt>.
+     */
+    protected abstract boolean doDecode(IoSession session, ByteBuffer in,
+            ProtocolDecoderOutput out) throws Exception;
+
+    /**
+     * Releases the cumulative buffer used by the specified <tt>session</tt>.
+     * Please don't forget to call <tt>super.dispose( session )</tt> when
+     * you override this method.
+     */
+    public void dispose(IoSession session) throws Exception {
+        removeSessionBuffer(session);
+    }
+
+    private void removeSessionBuffer(IoSession session) {
+        ByteBuffer buf = (ByteBuffer) session.removeAttribute(BUFFER);
+        if (buf != null) {
+            buf.release();
+        }
+    }
+
+    private void storeRemainingInSession(ByteBuffer buf, IoSession session) {
+        ByteBuffer remainingBuf = ByteBuffer.allocate(buf.capacity());
+        remainingBuf.setAutoExpand(true);
+        remainingBuf.order(buf.order());
+        remainingBuf.put(buf);
+        session.setAttribute(BUFFER, remainingBuf);
+    }
+}

Propchange: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java?rev=598285&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java (added)
+++ incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java Mon Nov 26 06:16:01 2007
@@ -0,0 +1,547 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.common.support.BaseIoAcceptor;
+import org.apache.mina.util.Queue;
+import org.apache.mina.util.NewThreadExecutor;
+import org.apache.mina.util.NamePreservingRunnable;
+import edu.emory.mathcs.backport.java.util.concurrent.Executor;
+
+/**
+ * {@link IoAcceptor} for socket transport (TCP/IP).
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class MultiThreadSocketAcceptor extends SocketAcceptor
+{
+    /**
+     * @noinspection StaticNonFinalField
+     */
+    private static volatile int nextId = 0;
+
+    private final Executor executor;
+    private final Object lock = new Object();
+    private final int id = nextId ++;
+    private final String threadName = "SocketAcceptor-" + id;
+    private final Map channels = new HashMap();
+
+    private final Queue registerQueue = new Queue();
+    private final Queue cancelQueue = new Queue();
+
+    private final MultiThreadSocketIoProcessor[] ioProcessors;
+    private final int processorCount;
+
+    /**
+     * @noinspection FieldAccessedSynchronizedAndUnsynchronized
+     */
+    private Selector selector;
+    private Worker worker;
+    private int processorDistributor = 0;
+
+    /**
+     * Create an acceptor with a single processing thread using a NewThreadExecutor
+     */
+    public MultiThreadSocketAcceptor()
+    {
+        this( 1, new NewThreadExecutor() );
+    }
+
+    /**
+     * Create an acceptor with the desired number of processing threads
+     *
+     * @param processorCount Number of processing threads
+     * @param executor Executor to use for launching threads
+     */
+    public MultiThreadSocketAcceptor( int processorCount, Executor executor )
+    {
+        if( processorCount < 1 )
+        {
+            throw new IllegalArgumentException( "Must have at least one processor" );
+        }
+
+        this.executor = executor;
+        this.processorCount = processorCount;
+        ioProcessors = new MultiThreadSocketIoProcessor[processorCount];
+
+        for( int i = 0; i < processorCount; i++ )
+        {
+            ioProcessors[i] = new MultiThreadSocketIoProcessor( "SocketAcceptorIoProcessor-" + id + "." + i, executor );
+        }
+    }
+
+
+    /**
+     * Binds to the specified <code>address</code> and handles incoming connections with the specified
+     * <code>handler</code>.  Backlog value is configured to the value of <code>backlog</code> property.
+     *
+     * @throws IOException if failed to bind
+     */
+    public void bind( SocketAddress address, IoHandler handler, IoServiceConfig config ) throws IOException
+    {
+        if( handler == null )
+        {
+            throw new NullPointerException( "handler" );
+        }
+
+        if( address != null && !( address instanceof InetSocketAddress ) )
+        {
+            throw new IllegalArgumentException( "Unexpected address type: " + address.getClass() );
+        }
+
+        if( config == null )
+        {
+            config = getDefaultConfig();
+        }
+
+        RegistrationRequest request = new RegistrationRequest( address, handler, config );
+
+        synchronized( registerQueue )
+        {
+            registerQueue.push( request );
+        }
+
+        startupWorker();
+
+        selector.wakeup();
+
+        synchronized( request )
+        {
+            while( !request.done )
+            {
+                try
+                {
+                    request.wait();
+                }
+                catch( InterruptedException e )
+                {
+                    ExceptionMonitor.getInstance().exceptionCaught( e );
+                }
+            }
+        }
+
+        if( request.exception != null )
+        {
+            throw request.exception;
+        }
+    }
+
+
+    private synchronized void startupWorker() throws IOException
+    {
+        synchronized( lock )
+        {
+            if( worker == null )
+            {
+                selector = Selector.open();
+                worker = new Worker();
+
+                executor.execute( new NamePreservingRunnable( worker ) );
+            }
+        }
+    }
+
+    public void unbind( SocketAddress address )
+    {
+        if( address == null )
+        {
+            throw new NullPointerException( "address" );
+        }
+
+        CancellationRequest request = new CancellationRequest( address );
+
+        try
+        {
+            startupWorker();
+        }
+        catch( IOException e )
+        {
+            // IOException is thrown only when Worker thread is not
+            // running and failed to open a selector.  We simply throw
+            // IllegalArgumentException here because we can simply
+            // conclude that nothing is bound to the selector.
+            throw new IllegalArgumentException( "Address not bound: " + address );
+        }
+
+        synchronized( cancelQueue )
+        {
+            cancelQueue.push( request );
+        }
+
+        selector.wakeup();
+
+        synchronized( request )
+        {
+            while( !request.done )
+            {
+                try
+                {
+                    request.wait();
+                }
+                catch( InterruptedException e )
+                {
+                    ExceptionMonitor.getInstance().exceptionCaught( e );
+                }
+            }
+        }
+
+        if( request.exception != null )
+        {
+            request.exception.fillInStackTrace();
+
+            throw request.exception;
+        }
+    }
+
+
+    private class Worker implements Runnable
+    {
+        public void run()
+        {
+            Thread.currentThread().setName(MultiThreadSocketAcceptor.this.threadName );
+
+            for( ; ; )
+            {
+                try
+                {
+                    int nKeys = selector.select();
+
+                    registerNew();
+
+                    if( nKeys > 0 )
+                    {
+                        processSessions( selector.selectedKeys() );
+                    }
+
+                    cancelKeys();
+
+                    if( selector.keys().isEmpty() )
+                    {
+                        synchronized( lock )
+                        {
+                            if( selector.keys().isEmpty() &&
+                                registerQueue.isEmpty() &&
+                                cancelQueue.isEmpty() )
+                            {
+                                worker = null;
+                                try
+                                {
+                                    selector.close();
+                                }
+                                catch( IOException e )
+                                {
+                                    ExceptionMonitor.getInstance().exceptionCaught( e );
+                                }
+                                finally
+                                {
+                                    selector = null;
+                                }
+                                break;
+                            }
+                        }
+                    }
+                }
+                catch( IOException e )
+                {
+                    ExceptionMonitor.getInstance().exceptionCaught( e );
+
+                    try
+                    {
+                        Thread.sleep( 1000 );
+                    }
+                    catch( InterruptedException e1 )
+                    {
+                        ExceptionMonitor.getInstance().exceptionCaught( e1 );
+                    }
+                }
+            }
+        }
+
+        private void processSessions( Set keys ) throws IOException
+        {
+            Iterator it = keys.iterator();
+            while( it.hasNext() )
+            {
+                SelectionKey key = ( SelectionKey ) it.next();
+
+                it.remove();
+
+                if( !key.isAcceptable() )
+                {
+                    continue;
+                }
+
+                ServerSocketChannel ssc = ( ServerSocketChannel ) key.channel();
+
+                SocketChannel ch = ssc.accept();
+
+                if( ch == null )
+                {
+                    continue;
+                }
+
+                boolean success = false;
+                try
+                {
+
+                    RegistrationRequest req = ( RegistrationRequest ) key.attachment();
+
+                    MultiThreadSocketSessionImpl session = new MultiThreadSocketSessionImpl(
+                            MultiThreadSocketAcceptor.this, nextProcessor(), getListeners(),
+                            req.config, ch, req.handler, req.address );
+
+                    // New Interface
+//                    SocketSessionImpl session = new SocketSessionImpl(
+//                            SocketAcceptor.this, nextProcessor(), getListeners(),
+//                            req.config, ch, req.handler, req.address );
+
+
+                    getFilterChainBuilder().buildFilterChain( session.getFilterChain() );
+                    req.config.getFilterChainBuilder().buildFilterChain( session.getFilterChain() );
+                    req.config.getThreadModel().buildFilterChain( session.getFilterChain() );
+                    session.getIoProcessor().addNew( session );
+                    success = true;
+                }
+                catch( Throwable t )
+                {
+                    ExceptionMonitor.getInstance().exceptionCaught( t );
+                }
+                finally
+                {
+                    if( !success )
+                    {
+                        ch.close();
+                    }
+                }
+            }
+        }
+    }
+
+    private MultiThreadSocketIoProcessor nextProcessor()
+    {
+        return ioProcessors[processorDistributor++ % processorCount];
+    }
+
+
+    private void registerNew()
+    {
+        if( registerQueue.isEmpty() )
+        {
+            return;
+        }
+
+        for( ; ; )
+        {
+            RegistrationRequest req;
+
+            synchronized( registerQueue )
+            {
+                req = ( RegistrationRequest ) registerQueue.pop();
+            }
+
+            if( req == null )
+            {
+                break;
+            }
+
+            ServerSocketChannel ssc = null;
+
+            try
+            {
+                ssc = ServerSocketChannel.open();
+                ssc.configureBlocking( false );
+
+                // Configure the server socket,
+                SocketAcceptorConfig cfg;
+                if( req.config instanceof SocketAcceptorConfig )
+                {
+                    cfg = ( SocketAcceptorConfig ) req.config;
+                }
+                else
+                {
+                    cfg = ( SocketAcceptorConfig ) getDefaultConfig();
+                }
+
+                ssc.socket().setReuseAddress( cfg.isReuseAddress() );
+                ssc.socket().setReceiveBufferSize(
+                    ( ( SocketSessionConfig ) cfg.getSessionConfig() ).getReceiveBufferSize() );
+
+                // and bind.
+                ssc.socket().bind( req.address, cfg.getBacklog() );
+                if( req.address == null || req.address.getPort() == 0 )
+                {
+                    req.address = ( InetSocketAddress ) ssc.socket().getLocalSocketAddress();
+                }
+                ssc.register( selector, SelectionKey.OP_ACCEPT, req );
+
+                synchronized( channels )
+                {
+                    channels.put( req.address, ssc );
+                }
+
+                getListeners().fireServiceActivated(
+                        this, req.address, req.handler, req.config );
+            }
+            catch( IOException e )
+            {
+                req.exception = e;
+            }
+            finally
+            {
+                synchronized( req )
+                {
+                    req.done = true;
+
+                    req.notifyAll();
+                }
+
+                if( ssc != null && req.exception != null )
+                {
+                    try
+                    {
+                        ssc.close();
+                    }
+                    catch( IOException e )
+                    {
+                        ExceptionMonitor.getInstance().exceptionCaught( e );
+                    }
+                }
+            }
+        }
+    }
+
+
+    private void cancelKeys()
+    {
+        if( cancelQueue.isEmpty() )
+        {
+            return;
+        }
+
+        for( ; ; )
+        {
+            CancellationRequest request;
+
+            synchronized( cancelQueue )
+            {
+                request = ( CancellationRequest ) cancelQueue.pop();
+            }
+
+            if( request == null )
+            {
+                break;
+            }
+
+            ServerSocketChannel ssc;
+            synchronized( channels )
+            {
+                ssc = ( ServerSocketChannel ) channels.remove( request.address );
+            }
+
+            // close the channel
+            try
+            {
+                if( ssc == null )
+                {
+                    request.exception = new IllegalArgumentException( "Address not bound: " + request.address );
+                }
+                else
+                {
+                    SelectionKey key = ssc.keyFor( selector );
+                    request.registrationRequest = ( RegistrationRequest ) key.attachment();
+                    key.cancel();
+
+                    selector.wakeup(); // wake up again to trigger thread death
+
+                    ssc.close();
+                }
+            }
+            catch( IOException e )
+            {
+                ExceptionMonitor.getInstance().exceptionCaught( e );
+            }
+            finally
+            {
+                synchronized( request )
+                {
+                    request.done = true;
+                    request.notifyAll();
+                }
+
+                if( request.exception == null )
+                {
+                    getListeners().fireServiceDeactivated(
+                            this, request.address,
+                            request.registrationRequest.handler,
+                            request.registrationRequest.config );
+                }
+            }
+        }
+    }
+
+    private static class RegistrationRequest
+    {
+        private InetSocketAddress address;
+        private final IoHandler handler;
+        private final IoServiceConfig config;
+        private IOException exception;
+        private boolean done;
+
+        private RegistrationRequest( SocketAddress address, IoHandler handler, IoServiceConfig config )
+        {
+            this.address = ( InetSocketAddress ) address;
+            this.handler = handler;
+            this.config = config;
+        }
+    }
+
+
+    private static class CancellationRequest
+    {
+        private final SocketAddress address;
+        private boolean done;
+        private RegistrationRequest registrationRequest;
+        private RuntimeException exception;
+
+        private CancellationRequest( SocketAddress address )
+        {
+            this.address = address;
+        }
+    }
+}

Propchange: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java?rev=598285&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java (added)
+++ incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java Mon Nov 26 06:16:01 2007
@@ -0,0 +1,487 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import edu.emory.mathcs.backport.java.util.concurrent.Executor;
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoConnectorConfig;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.common.support.AbstractIoFilterChain;
+import org.apache.mina.common.support.DefaultConnectFuture;
+import org.apache.mina.util.NamePreservingRunnable;
+import org.apache.mina.util.NewThreadExecutor;
+import org.apache.mina.util.Queue;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * {@link IoConnector} for socket transport (TCP/IP).
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class MultiThreadSocketConnector extends SocketConnector
+{
+    /** @noinspection StaticNonFinalField */
+    private static volatile int nextId = 0;
+
+    private final Object lock = new Object();
+    private final int id = nextId++;
+    private final String threadName = "SocketConnector-" + id;
+    private SocketConnectorConfig defaultConfig = new SocketConnectorConfig();
+    private final Queue connectQueue = new Queue();
+    private final MultiThreadSocketIoProcessor[] ioProcessors;
+    private final int processorCount;
+    private final Executor executor;
+
+    /** @noinspection FieldAccessedSynchronizedAndUnsynchronized */
+    private Selector selector;
+    private Worker worker;
+    private int processorDistributor = 0;
+    private int workerTimeout = 60;  // 1 min.
+
+    /** Create a connector with a single processing thread using a NewThreadExecutor */
+    public MultiThreadSocketConnector()
+    {
+        this(1, new NewThreadExecutor());
+    }
+
+    /**
+     * Create a connector with the desired number of processing threads
+     *
+     * @param processorCount Number of processing threads
+     * @param executor       Executor to use for launching threads
+     */
+    public MultiThreadSocketConnector(int processorCount, Executor executor)
+    {
+        if (processorCount < 1)
+        {
+            throw new IllegalArgumentException("Must have at least one processor");
+        }
+
+        this.executor = executor;
+        this.processorCount = processorCount;
+        ioProcessors = new MultiThreadSocketIoProcessor[processorCount];
+
+        for (int i = 0; i < processorCount; i++)
+        {
+            ioProcessors[i] = new MultiThreadSocketIoProcessor("SocketConnectorIoProcessor-" + id + "." + i, executor);
+        }
+    }
+
+    /**
+     * How many seconds to keep the connection thread alive between connection requests
+     *
+     * @return Number of seconds to keep connection thread alive
+     */
+    public int getWorkerTimeout()
+    {
+        return workerTimeout;
+    }
+
+    /**
+     * Set how many seconds the connection worker thread should remain alive once idle before terminating itself.
+     *
+     * @param workerTimeout Number of seconds to keep thread alive. Must be >=0
+     */
+    public void setWorkerTimeout(int workerTimeout)
+    {
+        if (workerTimeout < 0)
+        {
+            throw new IllegalArgumentException("Must be >= 0");
+        }
+        this.workerTimeout = workerTimeout;
+    }
+
+    public ConnectFuture connect(SocketAddress address, IoHandler handler, IoServiceConfig config)
+    {
+        return connect(address, null, handler, config);
+    }
+
+    public ConnectFuture connect(SocketAddress address, SocketAddress localAddress,
+                                 IoHandler handler, IoServiceConfig config)
+    {
+        if (address == null)
+        {
+            throw new NullPointerException("address");
+        }
+        if (handler == null)
+        {
+            throw new NullPointerException("handler");
+        }
+
+        if (!(address instanceof InetSocketAddress))
+        {
+            throw new IllegalArgumentException("Unexpected address type: "
+                                               + address.getClass());
+        }
+
+        if (localAddress != null && !(localAddress instanceof InetSocketAddress))
+        {
+            throw new IllegalArgumentException("Unexpected local address type: "
+                                               + localAddress.getClass());
+        }
+
+        if (config == null)
+        {
+            config = getDefaultConfig();
+        }
+
+        SocketChannel ch = null;
+        boolean success = false;
+        try
+        {
+            ch = SocketChannel.open();
+            ch.socket().setReuseAddress(true);
+            if (localAddress != null)
+            {
+                ch.socket().bind(localAddress);
+            }
+
+            ch.configureBlocking(false);
+
+            if (ch.connect(address))
+            {
+                DefaultConnectFuture future = new DefaultConnectFuture();
+                newSession(ch, handler, config, future);
+                success = true;
+                return future;
+            }
+
+            success = true;
+        }
+        catch (IOException e)
+        {
+            return DefaultConnectFuture.newFailedFuture(e);
+        }
+        finally
+        {
+            if (!success && ch != null)
+            {
+                try
+                {
+                    ch.close();
+                }
+                catch (IOException e)
+                {
+                    ExceptionMonitor.getInstance().exceptionCaught(e);
+                }
+            }
+        }
+
+        ConnectionRequest request = new ConnectionRequest(ch, handler, config);
+        synchronized (lock)
+        {
+            try
+            {
+                startupWorker();
+            }
+            catch (IOException e)
+            {
+                try
+                {
+                    ch.close();
+                }
+                catch (IOException e2)
+                {
+                    ExceptionMonitor.getInstance().exceptionCaught(e2);
+                }
+
+                return DefaultConnectFuture.newFailedFuture(e);
+            }
+        }
+
+        synchronized (connectQueue)
+        {
+            connectQueue.push(request);
+        }
+        selector.wakeup();
+
+        return request;
+    }
+
+    private synchronized void startupWorker() throws IOException
+    {
+        if (worker == null)
+        {
+            selector = Selector.open();
+            worker = new Worker();
+            executor.execute(new NamePreservingRunnable(worker));
+        }
+    }
+
+    private void registerNew()
+    {
+        if (connectQueue.isEmpty())
+        {
+            return;
+        }
+
+        for (; ;)
+        {
+            ConnectionRequest req;
+            synchronized (connectQueue)
+            {
+                req = (ConnectionRequest) connectQueue.pop();
+            }
+
+            if (req == null)
+            {
+                break;
+            }
+
+            SocketChannel ch = req.channel;
+            try
+            {
+                ch.register(selector, SelectionKey.OP_CONNECT, req);
+            }
+            catch (IOException e)
+            {
+                req.setException(e);
+            }
+        }
+    }
+
+    private void processSessions(Set keys)
+    {
+        Iterator it = keys.iterator();
+
+        while (it.hasNext())
+        {
+            SelectionKey key = (SelectionKey) it.next();
+
+            if (!key.isConnectable())
+            {
+                continue;
+            }
+
+            SocketChannel ch = (SocketChannel) key.channel();
+            ConnectionRequest entry = (ConnectionRequest) key.attachment();
+
+            boolean success = false;
+            try
+            {
+                ch.finishConnect();
+                newSession(ch, entry.handler, entry.config, entry);
+                success = true;
+            }
+            catch (Throwable e)
+            {
+                entry.setException(e);
+            }
+            finally
+            {
+                key.cancel();
+                if (!success)
+                {
+                    try
+                    {
+                        ch.close();
+                    }
+                    catch (IOException e)
+                    {
+                        ExceptionMonitor.getInstance().exceptionCaught(e);
+                    }
+                }
+            }
+        }
+
+        keys.clear();
+    }
+
+    private void processTimedOutSessions(Set keys)
+    {
+        long currentTime = System.currentTimeMillis();
+        Iterator it = keys.iterator();
+
+        while (it.hasNext())
+        {
+            SelectionKey key = (SelectionKey) it.next();
+
+            if (!key.isValid())
+            {
+                continue;
+            }
+
+            ConnectionRequest entry = (ConnectionRequest) key.attachment();
+
+            if (currentTime >= entry.deadline)
+            {
+                entry.setException(new ConnectException());
+                try
+                {
+                    key.channel().close();
+                }
+                catch (IOException e)
+                {
+                    ExceptionMonitor.getInstance().exceptionCaught(e);
+                }
+                finally
+                {
+                    key.cancel();
+                }
+            }
+        }
+    }
+
+    private void newSession(SocketChannel ch, IoHandler handler, IoServiceConfig config, ConnectFuture connectFuture)
+            throws IOException
+    {
+        MultiThreadSocketSessionImpl session =
+                new MultiThreadSocketSessionImpl(this, nextProcessor(), getListeners(),
+                                                 config, ch, handler, ch.socket().getRemoteSocketAddress());
+
+        //new interface
+//        SocketSessionImpl session = new SocketSessionImpl(
+//                this, nextProcessor(), getListeners(),
+//                config, ch, handler, ch.socket().getRemoteSocketAddress() );
+        try
+        {
+            getFilterChainBuilder().buildFilterChain(session.getFilterChain());
+            config.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
+            config.getThreadModel().buildFilterChain(session.getFilterChain());
+        }
+        catch (Throwable e)
+        {
+            throw (IOException) new IOException("Failed to create a session.").initCause(e);
+        }
+
+        // Set the ConnectFuture of the specified session, which will be
+        // removed and notified by AbstractIoFilterChain eventually.
+//        session.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, connectFuture );
+        session.setAttribute(AbstractIoFilterChain.class.getName() + ".connectFuture", connectFuture);
+
+        // Forward the remaining process to the SocketIoProcessor.
+        session.getIoProcessor().addNew(session);
+    }
+
+    private MultiThreadSocketIoProcessor nextProcessor()
+    {
+        return ioProcessors[processorDistributor++ % processorCount];
+    }
+
+    private class Worker implements Runnable
+    {
+        private long lastActive = System.currentTimeMillis();
+
+        public void run()
+        {
+            Thread.currentThread().setName(MultiThreadSocketConnector.this.threadName);
+
+            for (; ;)
+            {
+                try
+                {
+                    int nKeys = selector.select(1000);
+
+                    registerNew();
+
+                    if (nKeys > 0)
+                    {
+                        processSessions(selector.selectedKeys());
+                    }
+
+                    processTimedOutSessions(selector.keys());
+
+                    if (selector.keys().isEmpty())
+                    {
+                        if (System.currentTimeMillis() - lastActive > workerTimeout * 1000L)
+                        {
+                            synchronized (lock)
+                            {
+                                if (selector.keys().isEmpty() &&
+                                    connectQueue.isEmpty())
+                                {
+                                    worker = null;
+                                    try
+                                    {
+                                        selector.close();
+                                    }
+                                    catch (IOException e)
+                                    {
+                                        ExceptionMonitor.getInstance().exceptionCaught(e);
+                                    }
+                                    finally
+                                    {
+                                        selector = null;
+                                    }
+                                    break;
+                                }
+                            }
+                        }
+                    }
+                    else
+                    {
+                        lastActive = System.currentTimeMillis();
+                    }
+                }
+                catch (IOException e)
+                {
+                    ExceptionMonitor.getInstance().exceptionCaught(e);
+
+                    try
+                    {
+                        Thread.sleep(1000);
+                    }
+                    catch (InterruptedException e1)
+                    {
+                        ExceptionMonitor.getInstance().exceptionCaught(e1);
+                    }
+                }
+            }
+        }
+    }
+
+    private class ConnectionRequest extends DefaultConnectFuture
+    {
+        private final SocketChannel channel;
+        private final long deadline;
+        private final IoHandler handler;
+        private final IoServiceConfig config;
+
+        private ConnectionRequest(SocketChannel channel, IoHandler handler, IoServiceConfig config)
+        {
+            this.channel = channel;
+            long timeout;
+            if (config instanceof IoConnectorConfig)
+            {
+                timeout = ((IoConnectorConfig) config).getConnectTimeoutMillis();
+            }
+            else
+            {
+                timeout = ((IoConnectorConfig) getDefaultConfig()).getConnectTimeoutMillis();
+            }
+            this.deadline = System.currentTimeMillis() + timeout;
+            this.handler = handler;
+            this.config = config;
+        }
+    }
+}

Propchange: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketFilterChain.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketFilterChain.java?rev=598285&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketFilterChain.java (added)
+++ incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketFilterChain.java Mon Nov 26 06:16:01 2007
@@ -0,0 +1,67 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.io.IOException;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoFilter.WriteRequest;
+import org.apache.mina.common.support.AbstractIoFilterChain;
+import org.apache.mina.util.Queue;
+
+/**
+ * An {@link IoFilterChain} for socket transport (TCP/IP).
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ */
+class MultiThreadSocketFilterChain extends AbstractIoFilterChain {
+
+    MultiThreadSocketFilterChain( IoSession parent )
+    {
+        super( parent );
+    }
+
+    protected void doWrite( IoSession session, WriteRequest writeRequest )
+    {
+        MultiThreadSocketSessionImpl s = (MultiThreadSocketSessionImpl) session;
+        Queue writeRequestQueue = s.getWriteRequestQueue();
+
+        // SocketIoProcessor.doFlush() will reset it after write is finished
+        // because the buffer will be passed with messageSent event.
+        ( ( ByteBuffer ) writeRequest.getMessage() ).mark();
+        synchronized( writeRequestQueue )
+        {
+            writeRequestQueue.push( writeRequest );
+            if( writeRequestQueue.size() == 1 && session.getTrafficMask().isWritable() )
+            {
+                // Notify SocketIoProcessor only when writeRequestQueue was empty.
+                s.getIoProcessor().flush( s );
+            }
+        }
+    }
+
+    protected void doClose( IoSession session ) throws IOException
+    {
+        MultiThreadSocketSessionImpl s = (MultiThreadSocketSessionImpl) session;
+        s.getIoProcessor().remove( s );
+    }
+}

Propchange: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketFilterChain.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/M2.1.1/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketFilterChain.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date