You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2009/10/15 03:06:26 UTC

svn commit: r825362 [1/2] - in /qpid/trunk/qpid/java: broker/ broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/configuration/ broker/src/main/java/org/apache/qpid/server/connection/ broker/src/main/java/org/apach...

Author: aidan
Date: Thu Oct 15 01:06:23 2009
New Revision: 825362

URL: http://svn.apache.org/viewvc?rev=825362&view=rev
Log:
Merge java-network-refactor branch

Added:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
      - copied unchanged from r825319, qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java
      - copied unchanged from r825319, qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
      - copied unchanged from r825319, qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java
      - copied unchanged from r825319, qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java
      - copied unchanged from r825319, qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java
      - copied unchanged from r825319, qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java
      - copied unchanged from r825319, qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java
      - copied unchanged from r825319, qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
      - copied unchanged from r825319, qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
    qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/codec/
      - copied from r825319, qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/codec/
    qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java
      - copied unchanged from r825319, qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java
    qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/codec/MockAMQVersionAwareProtocolSession.java
      - copied unchanged from r825319, qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/codec/MockAMQVersionAwareProtocolSession.java
    qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java
      - copied unchanged from r825319, qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java
    qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/
      - copied from r825319, qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/network/
    qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/
      - copied from r825319, qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/
    qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java
      - copied unchanged from r825319, qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java
Removed:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPProtocolProvider.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestIoSession.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/QpidThreadExecutor.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java
    qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java
Modified:
    qpid/trunk/qpid/java/broker/build.xml
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java
    qpid/trunk/qpid/java/client/build.xml
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
    qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java
    qpid/trunk/qpid/java/systests/build.xml
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java

Modified: qpid/trunk/qpid/java/broker/build.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/build.xml?rev=825362&r1=825361&r2=825362&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/build.xml (original)
+++ qpid/trunk/qpid/java/broker/build.xml Thu Oct 15 01:06:23 2009
@@ -21,6 +21,7 @@
 <project name="AMQ Broker" default="build">
 
     <property name="module.depends" value="management/common common"/>
+    <property name="module.test.depends" value="common/test" />
     <property name="module.main" value="org.apache.qpid.server.Main"/>
 
     <import file="../module.xml"/>

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=825362&r1=825361&r2=825362&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Thu Oct 15 01:06:23 2009
@@ -41,7 +41,6 @@
 import org.apache.qpid.server.flow.FlowCreditManager;
 import org.apache.qpid.server.flow.Pre0_10CreditManager;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
 import org.apache.qpid.server.queue.*;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java?rev=825362&r1=825361&r2=825362&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java Thu Oct 15 01:06:23 2009
@@ -20,6 +20,13 @@
  */
 package org.apache.qpid.server;
 
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Properties;
+
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Option;
@@ -30,16 +37,9 @@
 import org.apache.log4j.Logger;
 import org.apache.log4j.PropertyConfigurator;
 import org.apache.log4j.xml.QpidLog4JConfigurator;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.FixedSizeByteBufferAllocator;
-import org.apache.mina.common.IoAcceptor;
-import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
-import org.apache.mina.util.NewThreadExecutor;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.common.QpidProperties;
 import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.pool.ReadWriteThreadModel;
 import org.apache.qpid.server.configuration.ServerConfiguration;
 import org.apache.qpid.server.configuration.management.ConfigurationManagementMBean;
 import org.apache.qpid.server.information.management.ServerInformationMBean;
@@ -48,19 +48,13 @@
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.management.LoggingManagementMBean;
 import org.apache.qpid.server.logging.messages.BrokerMessages;
-import org.apache.qpid.server.protocol.AMQPFastProtocolHandler;
-import org.apache.qpid.server.protocol.AMQPProtocolProvider;
+import org.apache.qpid.server.protocol.AMQProtocolEngineFactory;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
 import org.apache.qpid.server.transport.QpidAcceptor;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.BindException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.Properties;
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.transport.network.mina.MINANetworkDriver;
 
 /**
  * Main entry point for AMQPD.
@@ -300,20 +294,6 @@
             _brokerLogger.info("Starting Qpid Broker " + QpidProperties.getReleaseVersion()
                                + " build: " + QpidProperties.getBuildVersion());
 
-            ByteBuffer.setUseDirectBuffers(serverConfig.getEnableDirectBuffers());
-
-            // the MINA default is currently to use the pooled allocator although this may change in future
-            // once more testing of the performance of the simple allocator has been done
-            if (!serverConfig.getEnablePooledAllocator())
-            {
-                ByteBuffer.setAllocator(new FixedSizeByteBufferAllocator());
-            }
-
-            if (serverConfig.getUseBiasedWrites())
-            {
-                System.setProperty("org.apache.qpid.use_write_biased_pool", "true");
-            }
-
             int port = serverConfig.getPort();
 
             String portStr = commandLine.getOptionValue("p");
@@ -329,7 +309,54 @@
                 }
             }
 
-            bind(port, serverConfig);
+            String bindAddr = commandLine.getOptionValue("b");
+            if (bindAddr == null)
+            {
+                bindAddr = serverConfig.getBind();
+            }
+            InetAddress bindAddress = null;
+            if (bindAddr.equals("wildcard"))
+            {
+                bindAddress = new InetSocketAddress(port).getAddress();
+            }
+            else
+            {
+                bindAddress = InetAddress.getByAddress(parseIP(bindAddr));
+            }
+
+            String keystorePath = serverConfig.getKeystorePath();
+            String keystorePassword = serverConfig.getKeystorePassword();
+            String certType = serverConfig.getCertType();
+            SSLContextFactory sslFactory = null;
+            boolean isSsl = false;
+            
+            if (!serverConfig.getSSLOnly())
+            {
+                NetworkDriver driver = new MINANetworkDriver();
+                driver.bind(port, new InetAddress[]{bindAddress}, new AMQProtocolEngineFactory(), 
+                            serverConfig.getNetworkConfiguration(), null);
+                ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port), 
+                                                              new QpidAcceptor(driver,"TCP"));
+                CurrentActor.get().message(BrokerMessages.BRK_1002("TCP", port));
+            }
+            
+            if (serverConfig.getEnableSSL())
+            {
+                sslFactory = new SSLContextFactory(keystorePath, keystorePassword, certType);
+                NetworkDriver driver = new MINANetworkDriver();
+                driver.bind(serverConfig.getSSLPort(), new InetAddress[]{bindAddress}, 
+                            new AMQProtocolEngineFactory(), serverConfig.getNetworkConfiguration(), sslFactory);
+                ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port), 
+                        new QpidAcceptor(driver,"TCP"));
+                CurrentActor.get().message(BrokerMessages.BRK_1002("TCP/SSL", serverConfig.getSSLPort()));
+            }
+            
+            //fixme  qpid.AMQP should be using qpidproperties to get value
+            _brokerLogger.info("Qpid Broker Ready :" + QpidProperties.getReleaseVersion()
+                    + " build: " + QpidProperties.getBuildVersion());
+
+            CurrentActor.get().message(BrokerMessages.BRK_1004());
+
         }
         finally
         {
@@ -358,114 +385,6 @@
         }
     }
 
-    protected void bind(int port, ServerConfiguration config) throws BindException
-    {
-        String bindAddr = commandLine.getOptionValue("b");
-        if (bindAddr == null)
-        {
-            bindAddr = config.getBind();
-        }
-
-        try
-        {
-            IoAcceptor acceptor;
-            
-            if (ApplicationRegistry.getInstance().getConfiguration().getQpidNIO())
-            {
-                _logger.warn("Using Qpid Multithreaded IO Processing");
-                acceptor = new org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptor(config.getProcessors(), new NewThreadExecutor());
-            }
-            else
-            {
-                _logger.warn("Using Mina IO Processing");
-                acceptor = new org.apache.mina.transport.socket.nio.SocketAcceptor(config.getProcessors(), new NewThreadExecutor());
-            }
-            
-            SocketAcceptorConfig sconfig = (SocketAcceptorConfig) acceptor.getDefaultConfig();
-            SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig();
-
-            sc.setReceiveBufferSize(config.getReceiveBufferSize());
-            sc.setSendBufferSize(config.getWriteBufferSize());
-            sc.setTcpNoDelay(config.getTcpNoDelay());
-
-            // if we do not use the executor pool threading model we get the default leader follower
-            // implementation provided by MINA
-            if (config.getEnableExecutorPool())
-            {
-                sconfig.setThreadModel(ReadWriteThreadModel.getInstance());
-            }
-
-            if (!config.getEnableSSL() || !config.getSSLOnly())
-            {
-                AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler();
-                InetSocketAddress bindAddress;
-                if (bindAddr.equals("wildcard"))
-                {
-                    bindAddress = new InetSocketAddress(port);
-                }
-                else
-                {
-                    bindAddress = new InetSocketAddress(InetAddress.getByAddress(parseIP(bindAddr)), port);
-                }
-
-                bind(new QpidAcceptor(acceptor,"TCP"), bindAddress, handler, sconfig);
-
-                //fixme  qpid.AMQP should be using qpidproperties to get value
-                _brokerLogger.info("Qpid.AMQP listening on non-SSL address " + bindAddress);
-            }
-
-            if (config.getEnableSSL())
-            {
-                AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler();
-                try
-                {
-
-                    bind(new QpidAcceptor(acceptor, "TCP/SSL"), new InetSocketAddress(config.getSSLPort()), handler, sconfig);
-
-                    //fixme  qpid.AMQP should be using qpidproperties to get value
-                    _brokerLogger.info("Qpid.AMQP listening on SSL port " + config.getSSLPort());
-
-                }
-                catch (IOException e)
-                {
-                    _brokerLogger.error("Unable to listen on SSL port: " + e, e);
-                }
-            }
-
-            //fixme  qpid.AMQP should be using qpidproperties to get value
-            _brokerLogger.info("Qpid Broker Ready :" + QpidProperties.getReleaseVersion()
-                               + " build: " + QpidProperties.getBuildVersion());
-
-            CurrentActor.get().message(BrokerMessages.BRK_1004());
-
-        }
-        catch (Exception e)
-        {
-            _logger.error("Unable to bind service to registry: " + e, e);
-            //fixme this need tidying up
-            throw new BindException(e.getMessage());
-        }
-    }
-
-    /**
-     * Ensure that any bound Acceptors are recorded in the registry so they can be closed later.
-     *
-     * @param acceptor
-     * @param bindAddress
-     * @param handler
-     * @param sconfig
-     *
-     * @throws IOException from the acceptor.bind command
-     */
-    private void bind(QpidAcceptor acceptor, InetSocketAddress bindAddress, AMQPFastProtocolHandler handler, SocketAcceptorConfig sconfig) throws IOException
-    {
-        acceptor.getIoAcceptor().bind(bindAddress, handler, sconfig);
-
-        CurrentActor.get().message(BrokerMessages.BRK_1002(acceptor.toString(), bindAddress.getPort()));
-
-        ApplicationRegistry.getInstance().addAcceptor(bindAddress, acceptor);
-    }
-
     public static void main(String[] args)
     {
         //if the -Dlog4j.configuration property has not been set, enable the init override

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java?rev=825362&r1=825361&r2=825362&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java Thu Oct 15 01:06:23 2009
@@ -38,6 +38,7 @@
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.transport.NetworkDriverConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -570,7 +571,7 @@
 
     public boolean getSSLOnly()
     {
-        return getConfig().getBoolean("connector.ssl.sslOnly", true);
+        return getConfig().getBoolean("connector.ssl.sslOnly", false);
     }
 
     public int getSSLPort()
@@ -619,4 +620,57 @@
                    getConfig().getLong("housekeeping.expiredMessageCheckPeriod",
                            DEFAULT_HOUSEKEEPING_PERIOD));
     }
+
+    public NetworkDriverConfiguration getNetworkConfiguration()
+    {
+        return new NetworkDriverConfiguration()
+        {
+            
+            public Integer getTrafficClass()
+            {
+                return null;
+            }
+            
+            public Boolean getTcpNoDelay()
+            {
+                // Can't call parent getTcpNoDelay since it just calls this one
+                return getConfig().getBoolean("connector.tcpNoDelay", true);
+            }
+            
+            public Integer getSoTimeout()
+            {
+                return null;
+            }
+            
+            public Integer getSoLinger()
+            {
+                return null;
+            }
+            
+            public Integer getSendBufferSize()
+            {
+                return getBufferWriteLimit();
+            }
+            
+            public Boolean getReuseAddress()
+            {
+                return null;
+            }
+            
+            public Integer getReceiveBufferSize()
+            {
+                return getBufferReadLimit();
+            }
+            
+            public Boolean getOOBInline()
+            {
+                return null;
+            }
+            
+            public Boolean getKeepAlive()
+            {
+                return null;
+            }
+        };
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java?rev=825362&r1=825361&r2=825362&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java Thu Oct 15 01:06:23 2009
@@ -21,7 +21,6 @@
 package org.apache.qpid.server.connection;
 
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQConnectionException;
@@ -45,6 +44,14 @@
     {
 
     }
+    
+    public void expireClosedChannels()
+    {
+        for (AMQProtocolSession connection : _registry)
+        {
+            connection.closeIfLingeringClosedChannels();
+        }
+    }
 
     /** Close all of the currently open connections. */
     public void close() throws AMQException

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java?rev=825362&r1=825361&r2=825362&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java Thu Oct 15 01:06:23 2009
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.server.connection;
 
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.AMQException;
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?rev=825362&r1=825361&r2=825362&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Thu Oct 15 01:06:23 2009
@@ -34,6 +34,7 @@
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.security.Principal;
+import java.util.List;
 
 
 public interface AMQProtocolSession extends AMQVersionAwareProtocolSession
@@ -210,5 +211,21 @@
     public MethodDispatcher getMethodDispatcher();
 
     public ProtocolSessionIdentifier getSessionIdentifier();
+
+    String getClientVersion();
+
+    long getLastIoTime();
+
+    long getWrittenBytes();
+
+    Long getMaximumNumberOfChannels();
+
+    void setMaximumNumberOfChannels(Long value);
+
+    void commitTransactions(AMQChannel channel) throws AMQException;
+
+    List<AMQChannel> getChannels();
+
+    void closeIfLingeringClosedChannels();
     
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java?rev=825362&r1=825361&r2=825362&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java Thu Oct 15 01:06:23 2009
@@ -79,7 +79,7 @@
 @MBeanDescription("Management Bean for an AMQ Broker Connection")
 public class AMQProtocolSessionMBean extends AMQManagedObject implements ManagedConnection
 {
-    private AMQMinaProtocolSession _session = null;
+    private AMQProtocolSession _protocolSession = null;
     private String _name = null;
 
     // openmbean data types for representing the channel attributes
@@ -92,10 +92,10 @@
         new AMQShortString("Broker Management Console has closed the connection.");
 
     @MBeanConstructor("Creates an MBean exposing an AMQ Broker Connection")
-    public AMQProtocolSessionMBean(AMQMinaProtocolSession session) throws NotCompliantMBeanException, OpenDataException
+    public AMQProtocolSessionMBean(AMQProtocolSession amqProtocolSession) throws NotCompliantMBeanException, OpenDataException
     {
         super(ManagedConnection.class, ManagedConnection.TYPE, ManagedConnection.VERSION);
-        _session = session;
+        _protocolSession = amqProtocolSession;
         String remote = getRemoteAddress();
         remote = "anonymous".equals(remote) ? (remote + hashCode()) : remote;
         _name = jmxEncode(new StringBuffer(remote), 0).toString();
@@ -128,52 +128,52 @@
 
     public String getClientId()
     {
-        return (_session.getContextKey() == null) ? null : _session.getContextKey().toString();
+        return (_protocolSession.getContextKey() == null) ? null : _protocolSession.getContextKey().toString();
     }
 
     public String getAuthorizedId()
     {
-        return (_session.getAuthorizedID() != null ) ? _session.getAuthorizedID().getName() : null;
+        return (_protocolSession.getAuthorizedID() != null ) ? _protocolSession.getAuthorizedID().getName() : null;
     }
 
     public String getVersion()
     {
-        return (_session.getClientVersion() == null) ? null : _session.getClientVersion().toString();
+        return (_protocolSession.getClientVersion() == null) ? null : _protocolSession.getClientVersion().toString();
     }
 
     public Date getLastIoTime()
     {
-        return new Date(_session.getIOSession().getLastIoTime());
+        return new Date(_protocolSession.getLastIoTime());
     }
 
     public String getRemoteAddress()
     {
-        return _session.getIOSession().getRemoteAddress().toString();
+        return _protocolSession.getRemoteAddress().toString();
     }
 
     public ManagedObject getParentObject()
     {
-        return _session.getVirtualHost().getManagedObject();
+        return _protocolSession.getVirtualHost().getManagedObject();
     }
 
     public Long getWrittenBytes()
     {
-        return _session.getIOSession().getWrittenBytes();
+        return _protocolSession.getWrittenBytes();
     }
 
     public Long getReadBytes()
     {
-        return _session.getIOSession().getReadBytes();
+        return _protocolSession.getWrittenBytes();
     }
 
     public Long getMaximumNumberOfChannels()
     {
-        return _session.getMaximumNumberOfChannels();
+        return _protocolSession.getMaximumNumberOfChannels();
     }
 
     public void setMaximumNumberOfChannels(Long value)
     {
-        _session.setMaximumNumberOfChannels(value);
+        _protocolSession.setMaximumNumberOfChannels(value);
     }
 
     public String getObjectInstanceName()
@@ -192,13 +192,13 @@
         CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger()));
         try
         {
-            AMQChannel channel = _session.getChannel(channelId);
+            AMQChannel channel = _protocolSession.getChannel(channelId);
             if (channel == null)
             {
                 throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
             }
 
-            _session.commitTransactions(channel);
+            _protocolSession.commitTransactions(channel);
         }
         catch (AMQException ex)
         {
@@ -221,13 +221,13 @@
         CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger()));
         try
         {
-            AMQChannel channel = _session.getChannel(channelId);
+            AMQChannel channel = _protocolSession.getChannel(channelId);
             if (channel == null)
             {
                 throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
             }
 
-            _session.rollbackTransactions(channel);
+            _protocolSession.commitTransactions(channel);
         }
         catch (AMQException ex)
         {
@@ -248,7 +248,7 @@
     public TabularData channels() throws OpenDataException
     {
         TabularDataSupport channelsList = new TabularDataSupport(_channelsType);
-        List<AMQChannel> list = _session.getChannels();
+        List<AMQChannel> list = _protocolSession.getChannels();
 
         for (AMQChannel channel : list)
         {
@@ -274,7 +274,7 @@
     public void closeConnection() throws JMException
     {
 
-        MethodRegistry methodRegistry = _session.getMethodRegistry();
+        MethodRegistry methodRegistry = _protocolSession.getMethodRegistry();
         ConnectionCloseBody responseBody =
                 methodRegistry.createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(),
                                                          // replyCode
@@ -301,12 +301,12 @@
 
         try
         {
-            _session.writeFrame(responseBody.generateFrame(0));
+            _protocolSession.writeFrame(responseBody.generateFrame(0));
 
             try
             {
 
-                _session.closeSession();
+                _protocolSession.closeSession();
             }
             catch (AMQException ex)
             {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java?rev=825362&r1=825361&r2=825362&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java Thu Oct 15 01:06:23 2009
@@ -258,7 +258,7 @@
             for (InetSocketAddress bindAddress : _acceptors.keySet())
             {
                 QpidAcceptor acceptor = _acceptors.get(bindAddress);
-                acceptor.getIoAcceptor().unbind(bindAddress);
+                acceptor.getNetworkDriver().close();
                 CurrentActor.get().message(BrokerMessages.BRK_1003(acceptor.toString(), bindAddress.getPort()));
             }
         }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java?rev=825362&r1=825361&r2=825362&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java Thu Oct 15 01:06:23 2009
@@ -23,7 +23,6 @@
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
-import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Pattern;
@@ -32,7 +31,6 @@
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.security.access.ACLPlugin;
 import org.apache.qpid.server.security.access.ACLPluginFactory;
@@ -180,13 +178,13 @@
     @Override
     public AuthzResult authoriseConnect(AMQProtocolSession session, VirtualHost virtualHost)
     {
-        if (!(session instanceof AMQMinaProtocolSession))
+        SocketAddress sockAddr = session.getRemoteAddress();
+        if (!(sockAddr instanceof InetSocketAddress))
         {
-            return AuthzResult.ABSTAIN; // We only deal with tcp sessions, which
-                                        // mean MINA right now
+            return AuthzResult.ABSTAIN; // We only deal with tcp sessions
         }
 
-        InetAddress addr = getInetAdressFromMinaSession((AMQMinaProtocolSession) session);
+        InetAddress addr = ((InetSocketAddress) sockAddr).getAddress();
 
         if (addr == null)
         {
@@ -213,19 +211,6 @@
 
     }
 
-    private InetAddress getInetAdressFromMinaSession(AMQMinaProtocolSession session)
-    {
-        SocketAddress remote = session.getIOSession().getRemoteAddress();
-        if (remote instanceof InetSocketAddress)
-        {
-            return ((InetSocketAddress) remote).getAddress();
-        }
-        else
-        {
-            return null;
-        }
-    }
-
     public void setConfiguration(Configuration config) throws ConfigurationException
     {
         // Get default action

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java?rev=825362&r1=825361&r2=825362&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java Thu Oct 15 01:06:23 2009
@@ -20,21 +20,21 @@
  */
 package org.apache.qpid.server.transport;
 
-import org.apache.mina.common.IoAcceptor;
+import org.apache.qpid.transport.NetworkDriver;
 
 public class QpidAcceptor
 {
-    IoAcceptor _acceptor;
+    NetworkDriver _driver;
     String _protocol;
-    public QpidAcceptor(IoAcceptor acceptor, String protocol)
+    public QpidAcceptor(NetworkDriver driver, String protocol)
     {
-        _acceptor = acceptor;
+        _driver = driver;
         _protocol = protocol;
     }
 
-    public IoAcceptor getIoAcceptor()
+    public NetworkDriver getNetworkDriver()
     {
-        return _acceptor;
+        return _driver;
     }
 
     public String toString()

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=825362&r1=825361&r2=825362&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Thu Oct 15 01:06:23 2009
@@ -279,6 +279,14 @@
             _houseKeepingTimer.scheduleAtFixedRate(new RemoveExpiredMessagesTask(),
                                                    period / 2,
                                                    period);
+            
+            class ForceChannelClosuresTask extends TimerTask
+            {
+                public void run()
+                {
+                    _connectionRegistry.expireClosedChannels();
+                }
+            }
         }
     }
     

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java?rev=825362&r1=825361&r2=825362&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java Thu Oct 15 01:06:23 2009
@@ -20,26 +20,26 @@
  */
 package org.apache.qpid.server.configuration;
 
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+
 import junit.framework.TestCase;
+
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
+import org.apache.qpid.server.protocol.AMQProtocolEngine;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.protocol.TestIoSession;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Locale;
+import org.apache.qpid.transport.TestNetworkDriver;
 
 public class ServerConfigurationTest extends TestCase
 {
@@ -589,12 +589,12 @@
     {
         // Check default
         ServerConfiguration serverConfig = new ServerConfiguration(_config);
-        assertEquals(true, serverConfig.getSSLOnly());
+        assertEquals(false, serverConfig.getSSLOnly());
 
         // Check value we set
-        _config.setProperty("connector.ssl.sslOnly", false);
+        _config.setProperty("connector.ssl.sslOnly", true);
         serverConfig = new ServerConfiguration(_config);
-        assertEquals(false, serverConfig.getSSLOnly());
+        assertEquals(true, serverConfig.getSSLOnly());
     }
 
     public void testGetSSLPort() throws ConfigurationException
@@ -791,16 +791,15 @@
         // Test config
         VirtualHostRegistry virtualHostRegistry = reg.getVirtualHostRegistry();
         VirtualHost virtualHost = virtualHostRegistry.getVirtualHost("test");
-        AMQCodecFactory codecFactory = new AMQCodecFactory(true);
 
-        TestIoSession iosession = new TestIoSession();
-        iosession.setAddress("127.0.0.1");
+        TestNetworkDriver testDriver = new TestNetworkDriver();
+        testDriver.setRemoteAddress("127.0.0.1");
 
-        AMQProtocolSession session = new AMQMinaProtocolSession(iosession, virtualHostRegistry, codecFactory);
+        AMQProtocolEngine session = new AMQProtocolEngine(virtualHostRegistry, testDriver);
         assertFalse(reg.getAccessManager().authoriseConnect(session, virtualHost));
 
-        iosession.setAddress("127.1.2.3");
-        session = new AMQMinaProtocolSession(iosession, virtualHostRegistry, codecFactory);
+        testDriver.setRemoteAddress("127.1.2.3");
+        session = new AMQProtocolEngine(virtualHostRegistry, testDriver);
         assertTrue(reg.getAccessManager().authoriseConnect(session, virtualHost));
     }
 
@@ -866,12 +865,12 @@
         // Test config
         VirtualHostRegistry virtualHostRegistry = reg.getVirtualHostRegistry();
         VirtualHost virtualHost = virtualHostRegistry.getVirtualHost("test");
-        AMQCodecFactory codecFactory = new AMQCodecFactory(true);
 
-        TestIoSession iosession = new TestIoSession();
-        iosession.setAddress("127.0.0.1");
+        TestNetworkDriver testDriver = new TestNetworkDriver();
+        testDriver.setRemoteAddress("127.0.0.1");
 
-        AMQProtocolSession session = new AMQMinaProtocolSession(iosession, virtualHostRegistry, codecFactory);
+        AMQProtocolEngine session = new AMQProtocolEngine(virtualHostRegistry, testDriver);
+        session.setNetworkDriver(testDriver);
         assertFalse(reg.getAccessManager().authoriseConnect(session, virtualHost));
     }
 
@@ -935,12 +934,11 @@
         ApplicationRegistry.initialise(reg, 1);
 
         // Test config
-        TestIoSession iosession = new TestIoSession();
-        iosession.setAddress("127.0.0.1");
+        TestNetworkDriver testDriver = new TestNetworkDriver();
+        testDriver.setRemoteAddress("127.0.0.1");
         VirtualHostRegistry virtualHostRegistry = reg.getVirtualHostRegistry();
         VirtualHost virtualHost = virtualHostRegistry.getVirtualHost("test");
-        AMQCodecFactory codecFactory = new AMQCodecFactory(true);
-        AMQProtocolSession session = new AMQMinaProtocolSession(iosession, virtualHostRegistry, codecFactory);
+        AMQProtocolSession session = new AMQProtocolEngine(virtualHostRegistry, testDriver);
         assertFalse(reg.getAccessManager().authoriseConnect(session, virtualHost));
 
         RandomAccessFile fileBRandom = new RandomAccessFile(fileB, "rw");

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java?rev=825362&r1=825361&r2=825362&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java Thu Oct 15 01:06:23 2009
@@ -44,7 +44,7 @@
     private static final Logger log = Logger.getLogger(AMQProtocolSessionMBeanTest.class);
 
     private MessageStore _messageStore = new SkeletonMessageStore();
-    private AMQMinaProtocolSession _protocolSession;
+    private AMQProtocolEngine _protocolSession;
     private AMQChannel _channel;
     private AMQProtocolSessionMBean _mbean;
 

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java?rev=825362&r1=825361&r2=825362&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java Thu Oct 15 01:06:23 2009
@@ -20,23 +20,23 @@
  */
 package org.apache.qpid.server.protocol;
 
-import org.apache.qpid.AMQException;
-import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.output.ProtocolOutputConverter;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
+import java.security.Principal;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.security.Principal;
 
-public class InternalTestProtocolSession extends AMQMinaProtocolSession implements ProtocolOutputConverter
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.TestNetworkDriver;
+
+public class InternalTestProtocolSession extends AMQProtocolEngine implements ProtocolOutputConverter
 {
     // ChannelID(LIST)  -> LinkedList<Pair>
     final Map<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>> _channelDelivers;
@@ -44,9 +44,7 @@
 
     public InternalTestProtocolSession(VirtualHost virtualHost) throws AMQException
     {
-        super(new TestIoSession(),
-              ApplicationRegistry.getInstance().getVirtualHostRegistry(),
-              new AMQCodecFactory(true));
+        super(ApplicationRegistry.getInstance().getVirtualHostRegistry(), new TestNetworkDriver());
 
         _channelDelivers = new HashMap<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>>();
 

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java?rev=825362&r1=825361&r2=825362&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java Thu Oct 15 01:06:23 2009
@@ -37,7 +37,7 @@
 /** Test class to test MBean operations for AMQMinaProtocolSession. */
 public class MaxChannelsTest extends TestCase
 {
-	private AMQMinaProtocolSession _session;
+	private AMQProtocolEngine _session;
 
     public void testChannels() throws Exception
     {

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?rev=825362&r1=825361&r2=825362&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java Thu Oct 15 01:06:23 2009
@@ -20,37 +20,34 @@
  */
 package org.apache.qpid.server.queue;
 
+import java.util.ArrayList;
+import java.util.LinkedList;
+
+import javax.management.Notification;
+
 import junit.framework.TestCase;
+
+import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MemoryMessageStore;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.RequiredDeliveryException;
 import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.protocol.AMQProtocolEngine;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
-import org.apache.qpid.server.protocol.InternalTestProtocolSession;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.commons.configuration.CompositeConfiguration;
-import org.apache.mina.common.ByteBuffer;
-
-import javax.management.Notification;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.Collections;
-import java.util.Set;
-import java.security.Principal;
+import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.virtualhost.VirtualHost;
 
 /** This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters */
 public class AMQQueueAlertTest extends TestCase
@@ -62,7 +59,7 @@
     private AMQQueue _queue;
     private AMQQueueMBean _queueMBean;
     private VirtualHost _virtualHost;
-    private AMQMinaProtocolSession _protocolSession;
+    private AMQProtocolEngine _protocolSession;
     private MessageStore _messageStore = new MemoryMessageStore();
     private StoreContext _storeContext = new StoreContext();
     private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext,

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java?rev=825362&r1=825361&r2=825362&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java Thu Oct 15 01:06:23 2009
@@ -30,17 +30,14 @@
 import junit.framework.TestCase;
 
 import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
-import org.apache.qpid.server.protocol.TestIoSession;
+import org.apache.qpid.server.protocol.AMQProtocolEngine;
+import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult;
 import org.apache.qpid.server.store.TestableMemoryMessageStore;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.transport.TestNetworkDriver;
 
 public class FirewallPluginTest extends TestCase
 {
@@ -84,22 +81,22 @@
 
     private TestableMemoryMessageStore _store;
     private VirtualHost _virtualHost;
-    private AMQMinaProtocolSession _session;
+    private AMQProtocolEngine _session;
+    private TestNetworkDriver _testDriver;
 
     @Override
     public void setUp() throws Exception
     {
         super.setUp();
         _store = new TestableMemoryMessageStore();
-        TestIoSession iosession = new TestIoSession();
-        iosession.setAddress("127.0.0.1");
+        _testDriver = new TestNetworkDriver();
+        _testDriver.setRemoteAddress("127.0.0.1");
 
         // Retreive VirtualHost from the Registry
         VirtualHostRegistry virtualHostRegistry = ApplicationRegistry.getInstance().getVirtualHostRegistry();
         _virtualHost = virtualHostRegistry.getVirtualHost("test");
 
-        AMQCodecFactory codecFactory = new AMQCodecFactory(true);
-        _session = new AMQMinaProtocolSession(iosession, virtualHostRegistry, codecFactory);        
+        _session = new AMQProtocolEngine(virtualHostRegistry, _testDriver);
     }
 
     public void tearDown() throws Exception
@@ -170,7 +167,7 @@
         assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
         
         // Set session IP so that we're connected from the right address
-        ((TestIoSession) _session.getIOSession()).setAddress("192.168.23.23");
+        _testDriver.setRemoteAddress("192.168.23.23");
         assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
     }
     
@@ -185,7 +182,7 @@
         assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
         
         // Set session IP so that we're connected from the right address
-        ((TestIoSession) _session.getIOSession()).setAddress("192.168.23.23");
+        _testDriver.setRemoteAddress("192.168.23.23");
         assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
     }
 
@@ -198,7 +195,7 @@
         FirewallPlugin plugin = initialisePlugin("deny", new RuleInfo[]{rule});
 
         // Set session IP so that we're connected from the right address
-        ((TestIoSession) _session.getIOSession()).setAddress("127.0.0.1");
+        _testDriver.setRemoteAddress("127.0.0.1");
         assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
     }
 
@@ -211,7 +208,7 @@
         FirewallPlugin plugin = initialisePlugin("deny", new RuleInfo[]{rule});
 
         // Set session IP so that we're connected from the right address
-        ((TestIoSession) _session.getIOSession()).setAddress("127.0.0.1");
+        _testDriver.setRemoteAddress("127.0.0.1");
         assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
     }
     
@@ -234,7 +231,7 @@
         assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
         
         // Set session IP so that we're connected from the right address
-        ((TestIoSession) _session.getIOSession()).setAddress("192.168.23.23");
+        _testDriver.setRemoteAddress("192.168.23.23");
         assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
     }
     
@@ -257,7 +254,7 @@
         assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
         
         // Set session IP so that we're connected from the right address
-        ((TestIoSession) _session.getIOSession()).setAddress("192.168.23.23");
+        _testDriver.setRemoteAddress("192.168.23.23");
         assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
     }
 
@@ -271,7 +268,7 @@
         assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
         
         // Set session IP so that we're connected from the right address
-        ((TestIoSession) _session.getIOSession()).setAddress("192.168.23.23");
+        _testDriver.setRemoteAddress("192.168.23.23");
         assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
     }
     
@@ -285,7 +282,7 @@
         assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
         
         // Set session IP so that we're connected from the right address
-        ((TestIoSession) _session.getIOSession()).setAddress("192.168.23.23");
+        _testDriver.setRemoteAddress("192.168.23.23");
         assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
     }
     
@@ -295,11 +292,11 @@
         firstRule.setAccess("allow");
         firstRule.setHostname("foo, bar, "+new InetSocketAddress("127.0.0.1", 5672).getHostName());
         FirewallPlugin plugin = initialisePlugin("deny", new RuleInfo[]{firstRule});
-        ((TestIoSession) _session.getIOSession()).setAddress("10.0.0.1");
+        _testDriver.setRemoteAddress("10.0.0.1");
         assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
         
         // Set session IP so that we're connected from the right address
-        ((TestIoSession) _session.getIOSession()).setAddress("127.0.0.1");
+        _testDriver.setRemoteAddress("127.0.0.1");
         assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
     }
     

Modified: qpid/trunk/qpid/java/client/build.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/build.xml?rev=825362&r1=825361&r2=825362&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/build.xml (original)
+++ qpid/trunk/qpid/java/client/build.xml Thu Oct 15 01:06:23 2009
@@ -20,7 +20,7 @@
  -->
 <project name="AMQ Client" default="build">
 
-  <property name="module.depends" value="common"/>
+  <property name="module.depends" value="common common/test"/>
   <property name="module.genpom" value="true"/>
   <property name="module.genpom.args" value="-Sgeronimo-jms_1.1_spec=provided"/>
 

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=825362&r1=825361&r2=825362&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Thu Oct 15 01:06:23 2009
@@ -97,7 +97,7 @@
         {
             _conn.getProtocolHandler().createIoTransportSession(brokerDetail);
         }
-        
+        _conn._protocolHandler.getProtocolSession().init();
         // this blocks until the connection has been set up or when an error
         // has prevented the connection being set up
 

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java?rev=825362&r1=825361&r2=825362&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java Thu Oct 15 01:06:23 2009
@@ -20,9 +20,8 @@
  */
 package org.apache.qpid.client.failover;
 
-import org.apache.mina.common.IoSession;
-
 import org.apache.qpid.AMQDisconnectedException;
+import org.apache.qpid.AMQException;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.state.AMQStateManager;
 
@@ -81,9 +80,6 @@
     /** Used for debugging. */
     private static final Logger _logger = LoggerFactory.getLogger(FailoverHandler.class);
 
-    /** Holds the MINA session for the connection that has failed, not the connection that is being failed onto. */
-    private final IoSession _session;
-
     /** Holds the protocol handler for the failed connection, upon which the new connection is to be set up. */
     private AMQProtocolHandler _amqProtocolHandler;
 
@@ -99,10 +95,9 @@
      * @param amqProtocolHandler The protocol handler that spans the failover.
      * @param session            The MINA session, for the failing connection.
      */
-    public FailoverHandler(AMQProtocolHandler amqProtocolHandler, IoSession session)
+    public FailoverHandler(AMQProtocolHandler amqProtocolHandler)
     {
         _amqProtocolHandler = amqProtocolHandler;
-        _session = session;
     }
 
     /**
@@ -139,29 +134,10 @@
             // have a state waiter waiting until the connection is closed for some reason. Or in future we may have
             // a slightly more complex state model therefore I felt it was worthwhile doing this.
             AMQStateManager existingStateManager = _amqProtocolHandler.getStateManager();
-
-
-            // We are failing over so lets ensure any existing ProtocolSessions
-            // are closed. Closing them will update the stateManager which we
-            // probably don't want to record the change to the closed state.
-            // So lets make a new one.
-            _amqProtocolHandler.setStateManager(new AMQStateManager());
-
-            // Close the session, we need to wait for it to close as there may have
-            // been data in transit such as an ack that is still valid to send.
-            //
-            // While we are allowing data to continue to be written to the
-            // socket assuming the connection is still valid, we do not consider
-            // the possibility that the problem that triggered failover was
-            // entirely client side. In that situation the socket will still be
-            // open and the we should really send a ConnectionClose to be AMQP
-            // compliant.
-            _amqProtocolHandler.getProtocolSession().closeProtocolSession();
-
+            
             // Use a fresh new StateManager for the reconnection attempts
             _amqProtocolHandler.setStateManager(new AMQStateManager());
 
-
             if (!_amqProtocolHandler.getConnection().firePreFailover(_host != null))
             {
                 _logger.info("Failover process veto-ed by client");
@@ -240,7 +216,7 @@
                     _amqProtocolHandler.setFailoverState(FailoverState.FAILED);
                     /*try
                     {*/
-                    _amqProtocolHandler.exceptionCaught(_session, e);
+                    _amqProtocolHandler.exception(e);
                     /*}
                     catch (Exception ex)
                     {

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=825362&r1=825361&r2=825362&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Thu Oct 15 01:06:23 2009
@@ -20,24 +20,22 @@
  */
 package org.apache.qpid.client.protocol;
 
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoFilterChain;
-import org.apache.mina.common.IoHandlerAdapter;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.filter.ReadThrottleFilterBuilder;
-import org.apache.mina.filter.SSLFilter;
-import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+
 import org.apache.mina.filter.codec.ProtocolCodecException;
-import org.apache.mina.filter.codec.ProtocolCodecFilter;
-import org.apache.mina.filter.executor.ExecutorFilter;
 import org.apache.qpid.AMQConnectionClosedException;
 import org.apache.qpid.AMQDisconnectedException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQTimeoutException;
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.SSLConfiguration;
-import org.apache.qpid.client.configuration.ClientProperties;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.failover.FailoverHandler;
 import org.apache.qpid.client.failover.FailoverState;
@@ -46,23 +44,29 @@
 import org.apache.qpid.client.state.StateWaiter;
 import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
 import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ConnectionCloseOkBody;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersion;
 import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.pool.Job;
+import org.apache.qpid.pool.ReferenceCountingExecutorService;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
-import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.transport.NetworkDriver;
 import org.apache.qpid.transport.network.io.IoTransport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.CountDownLatch;
-
 /**
  * AMQProtocolHandler is the client side protocol handler for AMQP, it handles all protocol events received from the
  * network by MINA. The primary purpose of AMQProtocolHandler is to translate the generic event model of MINA into the
@@ -102,9 +106,6 @@
  *
  * <p/><table id="crc"><caption>CRC Card</caption>
  * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Create the filter chain to filter this handlers events.
- * <td> {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link SSLFilter}, {@link ReadWriteThreadModel}.
- *
  * <tr><td> Maintain fail-over state.
  * <tr><td>
  * </table>
@@ -120,7 +121,7 @@
  * held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so
  * that lifecycles of the fields match lifecycles of their containing objects.
  */
-public class AMQProtocolHandler extends IoHandlerAdapter
+public class AMQProtocolHandler implements ProtocolEngine
 {
     /** Used for debugging. */
     private static final Logger _logger = LoggerFactory.getLogger(AMQProtocolHandler.class);
@@ -137,7 +138,7 @@
     private volatile AMQProtocolSession _protocolSession;
 
     /** Holds the state of the protocol session. */
-    private AMQStateManager _stateManager = new AMQStateManager();
+    private AMQStateManager _stateManager;
 
     /** Holds the method listeners, */
     private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>();
@@ -166,7 +167,15 @@
 
     /** Object to lock on when changing the latch */
     private Object _failoverLatchChange = new Object();
-
+    private AMQCodecFactory _codecFactory;
+    private Job _readJob;
+    private Job _writeJob;
+    private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
+    private NetworkDriver _networkDriver;
+    
+    private long _writtenBytes;
+    private long _readBytes;
+    
     /**
      * Creates a new protocol handler, associated with the specified client connection instance.
      *
@@ -175,86 +184,13 @@
     public AMQProtocolHandler(AMQConnection con)
     {
         _connection = con;
-    }
-
-    /**
-     * Invoked by MINA when a MINA session for a new connection is created. This method sets up the filter chain on the
-     * session, which filters the events handled by this handler. The filter chain consists of, handing off events
-     * to an asynchronous thread pool, optionally encoding/decoding ssl, encoding/decoding AMQP.
-     *
-     * @param session The MINA session.
-     *
-     * @throws Exception Any underlying exceptions are allowed to fall through to MINA.
-     */
-    public void sessionCreated(IoSession session) throws Exception
-    {
-        _logger.debug("Protocol session created for session " + System.identityHashCode(session));
-        _failoverHandler = new FailoverHandler(this, session);
-
-        final ProtocolCodecFilter pcf = new ProtocolCodecFilter(new AMQCodecFactory(false));
-
-        if (Boolean.getBoolean("amqj.shared_read_write_pool"))
-        {
-            session.getFilterChain().addBefore("AsynchronousWriteFilter", "protocolFilter", pcf);
-        }
-        else
-        {
-            session.getFilterChain().addLast("protocolFilter", pcf);
-        }
-        // we only add the SSL filter where we have an SSL connection
-        if (_connection.getSSLConfiguration() != null)
-        {
-            SSLConfiguration sslConfig = _connection.getSSLConfiguration();
-            SSLContextFactory sslFactory =
-                    new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType());
-            SSLFilter sslFilter = new SSLFilter(sslFactory.buildClientContext());
-            sslFilter.setUseClientMode(true);
-            session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter);
-        }
-
-        try
-        {
-            ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance();
-            threadModel.getAsynchronousReadFilter().createNewJobForSession(session);
-            threadModel.getAsynchronousWriteFilter().createNewJobForSession(session);
-        }
-        catch (RuntimeException e)
-        {
-            _logger.error(e.getMessage(), e);
-        }
-
-        if (Boolean.getBoolean(ClientProperties.PROTECTIO_PROP_NAME))
-        {
-            try
-            {
-                //Add IO Protection Filters
-                IoFilterChain chain = session.getFilterChain();
-
-                session.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
-
-                ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
-                readfilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty(
-                        ClientProperties.READ_BUFFER_LIMIT_PROP_NAME, ClientProperties.READ_BUFFER_LIMIT_DEFAULT)));
-                readfilter.attach(chain);
-
-                WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
-                writefilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty(
-                        ClientProperties.WRITE_BUFFER_LIMIT_PROP_NAME, ClientProperties.WRITE_BUFFER_LIMIT_DEFAULT)));
-                writefilter.attach(chain);
-                session.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
-
-                _logger.info("Using IO Read/Write Filter Protection");
-            }
-            catch (Exception e)
-            {
-                _logger.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage());
-            }
-        }
-        _protocolSession = new AMQProtocolSession(this, session, _connection);
-
-        _stateManager.setProtocolSession(_protocolSession);
-
-        _protocolSession.init();
+        _protocolSession = new AMQProtocolSession(this, _connection);
+        _stateManager = new AMQStateManager(_protocolSession);
+        _codecFactory = new AMQCodecFactory(false, _protocolSession);
+        _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true);
+        _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false);
+        _poolReference.acquireExecutorService();
+        _failoverHandler = new FailoverHandler(this);
     }
 
     /**
@@ -283,12 +219,10 @@
      * may be called first followed by this method. This depends on whether the client was trying to send data at the
      * time of the failure.
      *
-     * @param session The MINA session.
-     *
      * @todo Clarify: presumably exceptionCaught is called when the client is sending during a connection failure and
      * not otherwise? The above comment doesn't make that clear.
      */
-    public void sessionClosed(IoSession session)
+    public void closed()
     {
         if (_connection.isClosed())
         {
@@ -327,7 +261,8 @@
                 {
                     _logger.debug("sessionClose() not allowed to failover");
                     _connection.exceptionReceived(new AMQDisconnectedException(
-                            "Server closed connection and reconnection " + "not permitted.", null));
+                            "Server closed connection and reconnection " + "not permitted.", 
+                            _stateManager.getLastException()));
                 }
                 else
                 {
@@ -350,33 +285,28 @@
         failoverThread.start();
     }
 
-    public void sessionIdle(IoSession session, IdleStatus status) throws Exception
+    @Override
+    public void readerIdle()
     {
-        _logger.debug("Protocol Session [" + this + ":" + session + "] idle: " + status);
-        if (IdleStatus.WRITER_IDLE.equals(status))
-        {
-            // write heartbeat frame:
-            _logger.debug("Sent heartbeat");
-            session.write(HeartbeatBody.FRAME);
-            HeartbeatDiagnostics.sent();
-        }
-        else if (IdleStatus.READER_IDLE.equals(status))
-        {
-            // failover:
-            HeartbeatDiagnostics.timeout();
-            _logger.warn("Timed out while waiting for heartbeat from peer.");
-            session.close();
-        }
+        _logger.debug("Protocol Session [" + this + "] idle: reader");
+        //  failover:
+        HeartbeatDiagnostics.timeout();
+        _logger.warn("Timed out while waiting for heartbeat from peer.");
+        _networkDriver.close();
+    }
+    
+    @Override
+    public void writerIdle()
+    {
+        _logger.debug("Protocol Session [" + this + "] idle: reader");
+        writeFrame(HeartbeatBody.FRAME);
+        HeartbeatDiagnostics.sent();
     }
 
     /**
-     * Invoked when any exception is thrown by a user IoHandler implementation or by MINA. If the cause is an
-     * IOException, MINA will close the connection automatically.
-     *
-     * @param session The MINA session.
-     * @param cause   The exception that triggered this event.
+     * Invoked when any exception is thrown by the NetworkDriver
      */
-    public void exceptionCaught(IoSession session, Throwable cause)
+    public void exception(Throwable cause)
     {
         if (_failoverState == FailoverState.NOT_STARTED)
         {
@@ -384,9 +314,9 @@
             if ((cause instanceof AMQConnectionClosedException) || cause instanceof IOException)
             {
                 _logger.info("Exception caught therefore going to attempt failover: " + cause, cause);
-                // this will attemp failover
-
-                sessionClosed(session);
+                // this will attempt failover
+                _networkDriver.close();
+                closed();
             }
             else
             {
@@ -490,48 +420,84 @@
 
     private static int _messageReceivedCount;
 
-    public void messageReceived(IoSession session, Object message) throws Exception
-    {
-        if (PROTOCOL_DEBUG)
-        {
-            _protocolLogger.info(String.format("RECV: [%s] %s", this, message));
-        }
 
-        if(message instanceof AMQFrame)
+    @Override
+    public void received(ByteBuffer msg)
+    {
+        try
         {
-            final boolean debug = _logger.isDebugEnabled();
-            final long msgNumber = ++_messageReceivedCount;
+            _readBytes += msg.remaining();
+            final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
 
-            if (debug && ((msgNumber % 1000) == 0))
+            Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable()
             {
-                _logger.debug("Received " + _messageReceivedCount + " protocol messages");
-            }
-
-            AMQFrame frame = (AMQFrame) message;
-
-            final AMQBody bodyFrame = frame.getBodyFrame();
-
-            HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
+                @Override
+                public void run()
+                {
+                    // Decode buffer
 
-            bodyFrame.handle(frame.getChannel(), _protocolSession);
+                    for (AMQDataBlock message : dataBlocks)
+                    {
 
-            _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
+                        try
+                        {
+                            if (PROTOCOL_DEBUG)
+                            {
+                                _protocolLogger.info(String.format("RECV: [%s] %s", this, message));
+                            }
+
+                            if(message instanceof AMQFrame)
+                            {
+                                final boolean debug = _logger.isDebugEnabled();
+                                final long msgNumber = ++_messageReceivedCount;
+
+                                if (debug && ((msgNumber % 1000) == 0))
+                                {
+                                    _logger.debug("Received " + _messageReceivedCount + " protocol messages");
+                                }
+
+                                AMQFrame frame = (AMQFrame) message;
+
+                                final AMQBody bodyFrame = frame.getBodyFrame();
+
+                                HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
+
+                                bodyFrame.handle(frame.getChannel(), _protocolSession);
+
+                                _connection.bytesReceived(_readBytes);
+                            }
+                            else if (message instanceof ProtocolInitiation)
+                            {
+                                // We get here if the server sends a response to our initial protocol header
+                                // suggesting an alternate ProtocolVersion; the server will then close the
+                                // connection.
+                                ProtocolInitiation protocolInit = (ProtocolInitiation) message;
+                                ProtocolVersion pv = protocolInit.checkVersion();
+                                getConnection().setProtocolVersion(pv);
+
+                                // get round a bug in old versions of qpid whereby the connection is not closed
+                                _stateManager.changeState(AMQState.CONNECTION_CLOSED);
+                            }
+                        }
+                        catch (Exception e)
+                        {
+                            e.printStackTrace();
+                            _logger.error("Exception processing frame", e);
+                            propagateExceptionToFrameListeners(e);
+                            exception(e);
+                        }
+                    }
+                }
+            });
         }
-        else if (message instanceof ProtocolInitiation)
+        catch (Exception e)
         {
-            // We get here if the server sends a response to our initial protocol header
-            // suggesting an alternate ProtocolVersion; the server will then close the
-            // connection.
-            ProtocolInitiation protocolInit = (ProtocolInitiation) message;
-            ProtocolVersion pv = protocolInit.checkVersion();
-            getConnection().setProtocolVersion(pv);
-
-            // get round a bug in old versions of qpid whereby the connection is not closed
-            _stateManager.changeState(AMQState.CONNECTION_CLOSED);
+            propagateExceptionToFrameListeners(e);
+            exception(e);
         }
     }
 
-    public void methodBodyReceived(final int channelId, final AMQBody bodyFrame, IoSession session)//, final IoSession session)
+    public void methodBodyReceived(final int channelId, final AMQBody bodyFrame)
             throws AMQException
     {
 
@@ -571,32 +537,13 @@
         {
             propagateExceptionToFrameListeners(e);
 
-            exceptionCaught(session, e);
+            exception(e);
         }
 
     }
 
     private static int _messagesOut;
 
-    public void messageSent(IoSession session, Object message) throws Exception
-    {
-        if (PROTOCOL_DEBUG)
-        {
-            _protocolLogger.debug(String.format("SEND: [%s] %s", this, message));
-        }
-
-        final long sentMessages = _messagesOut++;
-
-        final boolean debug = _logger.isDebugEnabled();
-
-        if (debug && ((sentMessages % 1000) == 0))
-        {
-            _logger.debug("Sent " + _messagesOut + " protocol messages");
-        }
-
-        _connection.bytesSent(session.getWrittenBytes());
-    }
-
     public StateWaiter createWaiter(Set<AMQState> states) throws AMQException
     {
         return getStateManager().createWaiter(states);
@@ -610,12 +557,41 @@
      */
     public void writeFrame(AMQDataBlock frame)
     {
-        _protocolSession.writeFrame(frame);
+        writeFrame(frame, false);
     }
 
     public void writeFrame(AMQDataBlock frame, boolean wait)
     {
-        _protocolSession.writeFrame(frame, wait);
+        final ByteBuffer buf = frame.toNioByteBuffer();
+        _writtenBytes += buf.remaining();
+        Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                _networkDriver.send(buf);
+            }
+        });
+        if (PROTOCOL_DEBUG)
+        {
+            _protocolLogger.debug(String.format("SEND: [%s] %s", this, frame));
+        }
+
+        final long sentMessages = _messagesOut++;
+
+        final boolean debug = _logger.isDebugEnabled();
+
+        if (debug && ((sentMessages % 1000) == 0))
+        {
+            _logger.debug("Sent " + _messagesOut + " protocol messages");
+        }
+
+        _connection.bytesSent(_writtenBytes);
+        
+        if (wait)
+        {
+            _networkDriver.flush();
+        }
     }
 
     /**
@@ -673,7 +649,7 @@
                 //FIXME: At this point here we should check or before add we should check _stateManager is in an open
                 // state so as we don't check we are likely just to time out here as I believe is being seen in QPID-1255 
             }
-            _protocolSession.writeFrame(frame);
+            writeFrame(frame);
 
             return listener.blockForFrame(timeout);
             // When control resumes before this line, a reply will have been received
@@ -723,38 +699,36 @@
         final AMQFrame frame = body.generateFrame(0);
 
         //If the connection is already closed then don't do a syncWrite
-        if (getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED))
-        {
-            _protocolSession.closeProtocolSession(false);
-        }
-        else
+        if (!getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED))
         {
             try
             {
                 syncWrite(frame, ConnectionCloseOkBody.class, timeout);
-                _protocolSession.closeProtocolSession();
+                _networkDriver.close();
+                closed();
             }
             catch (AMQTimeoutException e)
             {
-                _protocolSession.closeProtocolSession(false);
+                closed();
             }
             catch (FailoverException e)
             {
                 _logger.debug("FailoverException interrupted connection close, ignoring as connection   close anyway.");
             }
         }
+        _poolReference.releaseExecutorService();
     }
 
     /** @return the number of bytes read from this protocol session */
     public long getReadBytes()
     {
-        return _protocolSession.getIoSession().getReadBytes();
+        return _readBytes;
     }
 
     /** @return the number of bytes written to this protocol session */
     public long getWrittenBytes()
     {
-        return _protocolSession.getIoSession().getWrittenBytes();
+        return _writtenBytes;
     }
 
     public void failover(String host, int port)
@@ -807,6 +781,7 @@
     public void setStateManager(AMQStateManager stateManager)
     {
         _stateManager = stateManager;
+        _stateManager.setProtocolSession(_protocolSession);
     }
 
     public AMQProtocolSession getProtocolSession()
@@ -843,4 +818,35 @@
     {
         return _protocolSession.getProtocolVersion();
     }
+
+    public SocketAddress getRemoteAddress()
+    {
+        return _networkDriver.getRemoteAddress();
+    }
+
+    public SocketAddress getLocalAddress()
+    {
+        return _networkDriver.getLocalAddress();
+    }
+
+    public void setNetworkDriver(NetworkDriver driver)
+    {
+        _networkDriver = driver;
+    }
+    
+    /** @param delay delay in seconds (not ms) */
+    void initHeartbeats(int delay)
+    {
+        if (delay > 0)
+        {
+            getNetworkDriver().setMaxWriteIdle(delay);
+            getNetworkDriver().setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay));
+            HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
+        }
+    }
+
+    public NetworkDriver getNetworkDriver()
+    {
+        return _networkDriver;
+    }
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org