You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2009/09/01 18:27:55 UTC

svn commit: r810110 [1/2] - in /qpid/branches/java-network-refactor/qpid/java: broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/configuration/ broker/src/main/java/org/apache/qpid/server/connection/ broker/src/ma...

Author: aidan
Date: Tue Sep  1 16:27:52 2009
New Revision: 810110

URL: http://svn.apache.org/viewvc?rev=810110&view=rev
Log:
QPID-2025: Add a AMQProtocolEngine from the de-MINAfied AMQMinaProtocolSession. Remove various now-unused classes and update references. Add tests for AMQDecoder. Net -1500 lines, +25% performance on transient messaging. Nice.


Added:
    qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
      - copied, changed from r810108, qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java
    qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestNetworkDriver.java
    qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/codec/
    qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java
    qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/codec/MockAMQVersionAwareProtocolSession.java
Removed:
    qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestIoSession.java
Modified:
    qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
    qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
    qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
    qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
    qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
    qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
    qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
    qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java
    qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java
    qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
    qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
    qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
    qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
    qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
    qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java
    qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
    qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java
    qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
    qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
    qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java
    qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
    qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java
    qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
    qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
    qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java
    qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java
    qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java
    qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
    qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java Tue Sep  1 16:27:52 2009
@@ -20,6 +20,13 @@
  */
 package org.apache.qpid.server;
 
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Properties;
+
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Option;
@@ -30,16 +37,9 @@
 import org.apache.log4j.Logger;
 import org.apache.log4j.PropertyConfigurator;
 import org.apache.log4j.xml.QpidLog4JConfigurator;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.FixedSizeByteBufferAllocator;
-import org.apache.mina.common.IoAcceptor;
-import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
-import org.apache.mina.util.NewThreadExecutor;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.common.QpidProperties;
 import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.pool.ReadWriteThreadModel;
 import org.apache.qpid.server.configuration.ServerConfiguration;
 import org.apache.qpid.server.configuration.management.ConfigurationManagementMBean;
 import org.apache.qpid.server.information.management.ServerInformationMBean;
@@ -48,19 +48,13 @@
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.management.LoggingManagementMBean;
 import org.apache.qpid.server.logging.messages.BrokerMessages;
-import org.apache.qpid.server.protocol.AMQPFastProtocolHandler;
-import org.apache.qpid.server.protocol.AMQPProtocolProvider;
+import org.apache.qpid.server.protocol.AMQProtocolEngineFactory;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
 import org.apache.qpid.server.transport.QpidAcceptor;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.BindException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.Properties;
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.transport.network.mina.MINANetworkDriver;
 
 /**
  * Main entry point for AMQPD.
@@ -300,20 +294,6 @@
             _brokerLogger.info("Starting Qpid Broker " + QpidProperties.getReleaseVersion()
                                + " build: " + QpidProperties.getBuildVersion());
 
-            ByteBuffer.setUseDirectBuffers(serverConfig.getEnableDirectBuffers());
-
-            // the MINA default is currently to use the pooled allocator although this may change in future
-            // once more testing of the performance of the simple allocator has been done
-            if (!serverConfig.getEnablePooledAllocator())
-            {
-                ByteBuffer.setAllocator(new FixedSizeByteBufferAllocator());
-            }
-
-            if (serverConfig.getUseBiasedWrites())
-            {
-                System.setProperty("org.apache.qpid.use_write_biased_pool", "true");
-            }
-
             int port = serverConfig.getPort();
 
             String portStr = commandLine.getOptionValue("p");
@@ -329,7 +309,54 @@
                 }
             }
 
-            bind(port, serverConfig);
+            String bindAddr = commandLine.getOptionValue("b");
+            if (bindAddr == null)
+            {
+                bindAddr = serverConfig.getBind();
+            }
+            InetAddress bindAddress = null;
+            if (bindAddr.equals("wildcard"))
+            {
+                bindAddress = new InetSocketAddress(port).getAddress();
+            }
+            else
+            {
+                bindAddress = InetAddress.getByAddress(parseIP(bindAddr));
+            }
+
+            String keystorePath = serverConfig.getKeystorePath();
+            String keystorePassword = serverConfig.getKeystorePassword();
+            String certType = serverConfig.getCertType();
+            SSLContextFactory sslFactory = null;
+            boolean isSsl = false;
+            
+            if (!serverConfig.getSSLOnly())
+            {
+                NetworkDriver driver = new MINANetworkDriver();
+                driver.bind(port, new InetAddress[]{bindAddress}, new AMQProtocolEngineFactory(), 
+                            serverConfig.getNetworkConfiguration(), null);
+                ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port), 
+                                                              new QpidAcceptor(driver,"TCP"));
+                CurrentActor.get().message(BrokerMessages.BRK_1002("TCP", port));
+            }
+            
+            if (serverConfig.getEnableSSL())
+            {
+                sslFactory = new SSLContextFactory(keystorePath, keystorePassword, certType);
+                NetworkDriver driver = new MINANetworkDriver();
+                driver.bind(serverConfig.getSSLPort(), new InetAddress[]{bindAddress}, 
+                            new AMQProtocolEngineFactory(), serverConfig.getNetworkConfiguration(), sslFactory);
+                ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port), 
+                        new QpidAcceptor(driver,"TCP"));
+                CurrentActor.get().message(BrokerMessages.BRK_1002("TCP/SSL", serverConfig.getSSLPort()));
+            }
+            
+            //fixme  qpid.AMQP should be using qpidproperties to get value
+            _brokerLogger.info("Qpid Broker Ready :" + QpidProperties.getReleaseVersion()
+                    + " build: " + QpidProperties.getBuildVersion());
+
+            CurrentActor.get().message(BrokerMessages.BRK_1004());
+
         }
         finally
         {
@@ -358,114 +385,6 @@
         }
     }
 
-    protected void bind(int port, ServerConfiguration config) throws BindException
-    {
-        String bindAddr = commandLine.getOptionValue("b");
-        if (bindAddr == null)
-        {
-            bindAddr = config.getBind();
-        }
-
-        try
-        {
-            IoAcceptor acceptor;
-            
-            if (ApplicationRegistry.getInstance().getConfiguration().getQpidNIO())
-            {
-                _logger.warn("Using Qpid Multithreaded IO Processing");
-                acceptor = new org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptor(config.getProcessors(), new NewThreadExecutor());
-            }
-            else
-            {
-                _logger.warn("Using Mina IO Processing");
-                acceptor = new org.apache.mina.transport.socket.nio.SocketAcceptor(config.getProcessors(), new NewThreadExecutor());
-            }
-            
-            SocketAcceptorConfig sconfig = (SocketAcceptorConfig) acceptor.getDefaultConfig();
-            SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig();
-
-            sc.setReceiveBufferSize(config.getReceiveBufferSize());
-            sc.setSendBufferSize(config.getWriteBufferSize());
-            sc.setTcpNoDelay(config.getTcpNoDelay());
-
-            // if we do not use the executor pool threading model we get the default leader follower
-            // implementation provided by MINA
-            if (config.getEnableExecutorPool())
-            {
-                sconfig.setThreadModel(ReadWriteThreadModel.getInstance());
-            }
-
-            if (!config.getEnableSSL() || !config.getSSLOnly())
-            {
-                AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler();
-                InetSocketAddress bindAddress;
-                if (bindAddr.equals("wildcard"))
-                {
-                    bindAddress = new InetSocketAddress(port);
-                }
-                else
-                {
-                    bindAddress = new InetSocketAddress(InetAddress.getByAddress(parseIP(bindAddr)), port);
-                }
-
-                bind(new QpidAcceptor(acceptor,"TCP"), bindAddress, handler, sconfig);
-
-                //fixme  qpid.AMQP should be using qpidproperties to get value
-                _brokerLogger.info("Qpid.AMQP listening on non-SSL address " + bindAddress);
-            }
-
-            if (config.getEnableSSL())
-            {
-                AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler();
-                try
-                {
-
-                    bind(new QpidAcceptor(acceptor, "TCP/SSL"), new InetSocketAddress(config.getSSLPort()), handler, sconfig);
-
-                    //fixme  qpid.AMQP should be using qpidproperties to get value
-                    _brokerLogger.info("Qpid.AMQP listening on SSL port " + config.getSSLPort());
-
-                }
-                catch (IOException e)
-                {
-                    _brokerLogger.error("Unable to listen on SSL port: " + e, e);
-                }
-            }
-
-            //fixme  qpid.AMQP should be using qpidproperties to get value
-            _brokerLogger.info("Qpid Broker Ready :" + QpidProperties.getReleaseVersion()
-                               + " build: " + QpidProperties.getBuildVersion());
-
-            CurrentActor.get().message(BrokerMessages.BRK_1004());
-
-        }
-        catch (Exception e)
-        {
-            _logger.error("Unable to bind service to registry: " + e, e);
-            //fixme this need tidying up
-            throw new BindException(e.getMessage());
-        }
-    }
-
-    /**
-     * Ensure that any bound Acceptors are recorded in the registry so they can be closed later.
-     *
-     * @param acceptor
-     * @param bindAddress
-     * @param handler
-     * @param sconfig
-     *
-     * @throws IOException from the acceptor.bind command
-     */
-    private void bind(QpidAcceptor acceptor, InetSocketAddress bindAddress, AMQPFastProtocolHandler handler, SocketAcceptorConfig sconfig) throws IOException
-    {
-        acceptor.getIoAcceptor().bind(bindAddress, handler, sconfig);
-
-        CurrentActor.get().message(BrokerMessages.BRK_1002(acceptor.toString(), bindAddress.getPort()));
-
-        ApplicationRegistry.getInstance().addAcceptor(bindAddress, acceptor);
-    }
-
     public static void main(String[] args)
     {
         //if the -Dlog4j.configuration property has not been set, enable the init override

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java Tue Sep  1 16:27:52 2009
@@ -38,6 +38,7 @@
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.transport.NetworkDriverConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -564,7 +565,7 @@
 
     public boolean getSSLOnly()
     {
-        return getConfig().getBoolean("connector.ssl.sslOnly", true);
+        return getConfig().getBoolean("connector.ssl.sslOnly", false);
     }
 
     public int getSSLPort()
@@ -613,4 +614,57 @@
                    getConfig().getLong("housekeeping.expiredMessageCheckPeriod",
                            DEFAULT_HOUSEKEEPING_PERIOD));
     }
+
+    public NetworkDriverConfiguration getNetworkConfiguration()
+    {
+        return new NetworkDriverConfiguration()
+        {
+            
+            public Integer getTrafficClass()
+            {
+                return null;
+            }
+            
+            public Boolean getTcpNoDelay()
+            {
+                // Can't call parent getTcpNoDelay since it just calls this one
+                return getConfig().getBoolean("connector.tcpNoDelay", true);
+            }
+            
+            public Integer getSoTimeout()
+            {
+                return null;
+            }
+            
+            public Integer getSoLinger()
+            {
+                return null;
+            }
+            
+            public Integer getSendBufferSize()
+            {
+                return getBufferWriteLimit();
+            }
+            
+            public Boolean getReuseAddress()
+            {
+                return null;
+            }
+            
+            public Integer getReceiveBufferSize()
+            {
+                return getBufferReadLimit();
+            }
+            
+            public Boolean getOOBInline()
+            {
+                return null;
+            }
+            
+            public Boolean getKeepAlive()
+            {
+                return null;
+            }
+        };
+    }
 }

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java Tue Sep  1 16:27:52 2009
@@ -21,7 +21,6 @@
 package org.apache.qpid.server.connection;
 
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQConnectionException;

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java Tue Sep  1 16:27:52 2009
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.server.connection;
 
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.AMQException;
 

Copied: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (from r810108, qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?p2=qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java&p1=qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java&r1=810108&r2=810110&rev=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Tue Sep  1 16:27:52 2009
@@ -20,11 +20,25 @@
  */
 package org.apache.qpid.server.protocol;
 
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.management.JMException;
+import javax.security.sasl.SaslServer;
+
 import org.apache.log4j.Logger;
-import org.apache.mina.common.CloseFuture;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoServiceConfig;
-import org.apache.mina.common.IoSession;
 import org.apache.mina.transport.vmpipe.VmPipeAddress;
 import org.apache.qpid.AMQChannelException;
 import org.apache.qpid.AMQConnectionException;
@@ -36,8 +50,11 @@
 import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQProtocolHeaderException;
+import org.apache.qpid.framing.AMQProtocolVersionException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.ConnectionCloseBody;
 import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.FieldTable;
@@ -46,18 +63,23 @@
 import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.framing.ProtocolInitiation;
 import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.pool.Event;
+import org.apache.qpid.pool.Job;
+import org.apache.qpid.pool.PoolingFilter;
 import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.pool.ReferenceCountingExecutorService;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
+import org.apache.qpid.protocol.ProtocolEngine;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.handler.ServerMethodDispatcherImpl;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.actors.AMQPConnectionActor;
 import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.messages.ConnectionMessages;
+import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
 import org.apache.qpid.server.management.Managable;
 import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.output.ProtocolOutputConverter;
@@ -67,22 +89,10 @@
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.transport.NetworkDriver;
 import org.apache.qpid.transport.Sender;
 
-import javax.management.JMException;
-import javax.security.sasl.SaslServer;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.security.Principal;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.atomic.AtomicLong;
-
-public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
+public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocolSession
 {
     private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class);
 
@@ -94,8 +104,6 @@
     // channels.  This value must be of the form 2^x - 1.
     private static final int CHANNEL_CACHE_SIZE = 0xff;
 
-    private final IoSession _minaProtocolSession;
-
     private AMQShortString _contextKey;
 
     private AMQShortString _clientVersion = null;
@@ -135,47 +143,43 @@
     private Principal _authorizedID;
     private MethodDispatcher _dispatcher;
     private ProtocolSessionIdentifier _sessionIdentifier;
-
-    private static final long LAST_WRITE_FUTURE_JOIN_TIMEOUT = 60000L;
-    private org.apache.mina.common.WriteFuture _lastWriteFuture;
-
+    
     // Create a simple ID that increments for ever new Session
     private final long _sessionID = idGenerator.getAndIncrement();
 
     private AMQPConnectionActor _actor;
     private LogSubject _logSubject;
 
+    private NetworkDriver _networkDriver;
+
+    private long _lastIoTime;
+
+    private long _writtenBytes;
+    private long _readBytes;
+    
+    private Job _readJob;
+    private Job _writeJob;
+
+    private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
+
     public ManagedObject getManagedObject()
     {
         return _managedObject;
     }
 
-    public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory)
-            throws AMQException
+    public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkDriver driver)
     {
         _stateManager = new AMQStateManager(virtualHostRegistry, this);
-        _minaProtocolSession = session;
-        session.setAttachment(this);
-
-        _codecFactory = codecFactory;
+        _networkDriver = driver;
+        
+        _codecFactory = new AMQCodecFactory(true, this);
+
+        ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance();
+        _readJob = new Job(threadModel.getAsynchronousReadFilter(), PoolingFilter.MAX_JOB_EVENTS, true);
+        _writeJob = new Job(threadModel.getAsynchronousWriteFilter(), PoolingFilter.MAX_JOB_EVENTS, false);
 
         _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
-
         _actor.message(ConnectionMessages.CON_1001(null, null, false, false));
-
-        try
-        {
-            IoServiceConfig config = session.getServiceConfig();
-            ReadWriteThreadModel threadModel = (ReadWriteThreadModel) config.getThreadModel();
-            threadModel.getAsynchronousReadFilter().createNewJobForSession(session);
-            threadModel.getAsynchronousWriteFilter().createNewJobForSession(session);
-        }
-        catch (RuntimeException e)
-        {
-            e.printStackTrace();
-            throw e;
-
-        }
     }
 
     private AMQProtocolSessionMBean createMBean() throws AMQException
@@ -191,16 +195,6 @@
         }
     }
 
-    public IoSession getIOSession()
-    {
-        return _minaProtocolSession;
-    }
-
-    public static AMQProtocolSession getAMQProtocolSession(IoSession minaProtocolSession)
-    {
-        return (AMQProtocolSession) minaProtocolSession.getAttachment();
-    }
-
     public long getSessionID()
     {
         return _sessionID;
@@ -211,6 +205,42 @@
         return _actor;
     }
 
+    @Override
+    public void received(final ByteBuffer msg)
+    {
+        _lastIoTime = System.currentTimeMillis();
+        try
+        {
+            final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
+            fireAsynchEvent(_readJob, new Event(new Runnable()
+            {
+                @Override
+                public void run()
+                {
+                    // Decode buffer
+
+                    for (AMQDataBlock dataBlock : dataBlocks)
+                    {
+                        try
+                        {
+                            dataBlockReceived(dataBlock);
+                        }
+                        catch (Exception e)
+                        {
+                            e.printStackTrace();
+                            closeProtocolSession();
+                        }
+                    }
+                }
+            }));
+        }
+        catch (Exception e)
+        {
+            e.printStackTrace();
+            closeProtocolSession();
+        }
+    }
+
     public void dataBlockReceived(AMQDataBlock message) throws Exception
     {
         _lastReceived = message;
@@ -314,21 +344,14 @@
                                                                                        null,
                                                                                        mechanisms.getBytes(),
                                                                                        locales.getBytes());
-            _minaProtocolSession.write(responseBody.generateFrame(0));
+            _networkDriver.send(responseBody.generateFrame(0).toNioByteBuffer());
 
         }
         catch (AMQException e)
         {
             _logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion());
 
-            _minaProtocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
-
-            // TODO: Close connection (but how to wait until message is sent?)
-            // ritchiem 2006-12-04 will this not do?
-            // WriteFuture future = _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOLgetProtocolMajorVersion()], pv[i][PROTOCOLgetProtocolMinorVersion()]));
-            // future.join();
-            // close connection
-
+            _networkDriver.send(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()).toNioByteBuffer());
         }
     }
 
@@ -437,8 +460,17 @@
     public void writeFrame(AMQDataBlock frame)
     {
         _lastSent = frame;
-
-        _lastWriteFuture = _minaProtocolSession.write(frame);
+        final ByteBuffer buf = frame.toNioByteBuffer();
+        _lastIoTime = System.currentTimeMillis();
+        _writtenBytes += buf.remaining();
+        fireAsynchEvent(_writeJob, new Event(new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                _networkDriver.send(buf);
+            }
+        }));
     }
 
     public AMQShortString getContextKey()
@@ -539,7 +571,7 @@
     {
         _maxNoOfChannels = value;
     }
-
+    
     public void commitTransactions(AMQChannel channel) throws AMQException
     {
         if ((channel != null) && channel.isTransactional())
@@ -555,7 +587,7 @@
             channel.rollback();
         }
     }
-
+    
     /**
      * Close a specific channel. This will remove any resources used by the channel, including: <ul><li>any queue
      * subscriptions (this may in turn remove queues if they are auto delete</li> </ul>
@@ -628,8 +660,8 @@
     {
         if (delay > 0)
         {
-            _minaProtocolSession.setIdleTime(IdleStatus.WRITER_IDLE, delay);
-            _minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, (int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay));
+            _networkDriver.setMaxWriteIdle(delay);
+            _networkDriver.setMaxReadIdle((int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay));
         }
     }
 
@@ -672,7 +704,7 @@
             {
                 task.doTask(this);
             }
-
+            
             _closed = true;
 
             CurrentActor.get().message(_logSubject, ConnectionMessages.CON_1002());
@@ -699,21 +731,7 @@
 
     public void closeProtocolSession()
     {
-        closeProtocolSession(true);
-    }
-
-    public void closeProtocolSession(boolean waitLast)
-    {
-        if (waitLast && (_lastWriteFuture != null))
-        {
-            _logger.debug("Waiting for last write to join.");
-            _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
-        }
-
-        _logger.debug("REALLY Closing protocol session:" + _minaProtocolSession);
-        final CloseFuture future = _minaProtocolSession.close();
-        future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
-
+        _networkDriver.close();
         try
         {
             _stateManager.changeState(AMQState.CONNECTION_CLOSED);
@@ -726,7 +744,7 @@
 
     public String toString()
     {
-        return _minaProtocolSession.getRemoteAddress() + "(" + (getAuthorizedID() == null ? "?" : getAuthorizedID().getName() + ")");
+        return getRemoteAddress() + "(" + (getAuthorizedID() == null ? "?" : getAuthorizedID().getName() + ")");
     }
 
     public String dump()
@@ -737,7 +755,7 @@
     /** @return an object that can be used to identity */
     public Object getKey()
     {
-        return _minaProtocolSession.getRemoteAddress();
+        return getRemoteAddress();
     }
 
     /**
@@ -748,7 +766,7 @@
      */
     public String getLocalFQDN()
     {
-        SocketAddress address = _minaProtocolSession.getLocalAddress();
+        SocketAddress address = _networkDriver.getLocalAddress();
         // we use the vmpipe address in some tests hence the need for this rather ugly test. The host
         // information is used by SASL primary.
         if (address instanceof InetSocketAddress)
@@ -764,7 +782,7 @@
             throw new IllegalArgumentException("Unsupported socket address class: " + address);
         }
     }
-
+    
     public SaslServer getSaslServer()
     {
         return _saslServer;
@@ -837,7 +855,7 @@
 
     public Object getClientIdentifier()
     {
-        return (_minaProtocolSession != null) ? _minaProtocolSession.getRemoteAddress() : null;
+        return (_networkDriver != null) ? _networkDriver.getRemoteAddress() : null;
     }
 
     public VirtualHost getVirtualHost()
@@ -867,7 +885,7 @@
     {
         _taskList.remove(task);
     }
-
+    
     public ProtocolOutputConverter getProtocolOutputConverter()
     {
         return _protocolOutputConverter;
@@ -888,7 +906,7 @@
 
     public SocketAddress getRemoteAddress()
     {
-        return _minaProtocolSession.getRemoteAddress();
+        return _networkDriver.getRemoteAddress();
     }
 
     public MethodRegistry getMethodRegistry()
@@ -901,23 +919,136 @@
         return _dispatcher;
     }
 
-    public ProtocolSessionIdentifier getSessionIdentifier()
+    @Override
+    public void closed()
     {
-        return _sessionIdentifier;
+        try
+        {
+            closeSession();
+        }
+        catch (AMQException e)
+        {
+           _logger.error("Could not close protocol engine", e);
+        }
     }
 
-    public String getClientVersion()
+    @Override
+    public void readerIdle()
     {
-        return (_clientVersion == null) ? null : _clientVersion.toString();
+        // Nothing
+    }
+
+    @Override
+    public void setNetworkDriver(NetworkDriver driver)
+    {
+        _networkDriver = driver;        
+    }
+
+    @Override
+    public void writerIdle()
+    {
+        _networkDriver.send(HeartbeatBody.FRAME.toNioByteBuffer());
     }
 
-    public void setSender(Sender<java.nio.ByteBuffer> sender)
+    @Override
+    public void exception(Throwable throwable)
     {
-       // No-op, interface munging between this and AMQProtocolSession
+        if (throwable instanceof AMQProtocolHeaderException)
+        {
+
+            writeFrame(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
+            _networkDriver.close();
+
+            _logger.error("Error in protocol initiation " + this + ":" + getRemoteAddress() + " :" + throwable.getMessage(), throwable);
+        }
+        else if (throwable instanceof IOException)
+        {
+            _logger.error("IOException caught in" + this + ", session closed implictly: " + throwable);
+        }
+        else
+        {
+            _logger.error("Exception caught in" + this + ", closing session explictly: " + throwable, throwable);
+
+
+            MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(getProtocolVersion());
+            ConnectionCloseBody closeBody = methodRegistry.createConnectionCloseBody(200,new AMQShortString(throwable.getMessage()),0,0);
+                        
+            writeFrame(closeBody.generateFrame(0));
+
+            _networkDriver.close();
+        }
     }
 
+    @Override
     public void init()
     {
-       // No-op, interface munging between this and AMQProtocolSession
+        // Do nothing
+    }
+
+    @Override
+    public void setSender(Sender<ByteBuffer> sender)
+    {
+        // Do nothing
+    }
+
+    @Override
+    public long getReadBytes()
+    {
+        return _readBytes;
+    }
+
+    public long getWrittenBytes()
+    {
+        return _writtenBytes;
+    }
+
+    public long getLastIoTime()
+    {
+        return _lastIoTime;
+    }
+
+    public ProtocolSessionIdentifier getSessionIdentifier()
+    {
+        return _sessionIdentifier;
+    }
+    
+    public String getClientVersion()
+    {
+        return (_clientVersion == null) ? null : _clientVersion.toString();
+    }
+    
+    /**
+     * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running.
+     *
+     * @param job The job.
+     * @param event   The event to hand off asynchronously.
+     */
+    void fireAsynchEvent(Job job, Event event)
+    {
+
+        job.add(event);
+
+        final ExecutorService pool = _poolReference .getPool();
+
+        if(pool == null)
+        {
+            return;
+        }
+
+        // rather than perform additional checks on pool to check that it hasn't shutdown.
+        // catch the RejectedExecutionException that will result from executing on a shutdown pool
+        if (job.activate())
+        {
+            try
+            {
+                pool.execute(job);
+            }
+            catch(RejectedExecutionException e)
+            {
+                _logger.warn("Thread pool shutdown while tasks still outstanding");
+            }
+        }
+
     }
+    
 }

Added: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java?rev=810110&view=auto
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java (added)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java Tue Sep  1 16:27:52 2009
@@ -0,0 +1,29 @@
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.transport.NetworkDriver;
+
+public class AMQProtocolEngineFactory implements ProtocolEngineFactory
+{
+    private VirtualHostRegistry _vhosts;
+
+    public AMQProtocolEngineFactory()
+    {
+        this(1);
+    }
+    
+    public AMQProtocolEngineFactory(Integer port)
+    {
+        _vhosts = ApplicationRegistry.getInstance(port).getVirtualHostRegistry();
+    }
+   
+    
+    public ProtocolEngine newProtocolEngine(NetworkDriver networkDriver)
+    {
+        return new AMQProtocolEngine(_vhosts, networkDriver);
+    }
+
+}

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Tue Sep  1 16:27:52 2009
@@ -34,6 +34,7 @@
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.security.Principal;
+import java.util.List;
 
 
 public interface AMQProtocolSession extends AMQVersionAwareProtocolSession
@@ -210,5 +211,19 @@
     public MethodDispatcher getMethodDispatcher();
 
     public ProtocolSessionIdentifier getSessionIdentifier();
+
+    String getClientVersion();
+
+    long getLastIoTime();
+
+    long getWrittenBytes();
+
+    Long getMaximumNumberOfChannels();
+
+    void setMaximumNumberOfChannels(Long value);
+
+    void commitTransactions(AMQChannel channel) throws AMQException;
+
+    List<AMQChannel> getChannels();
     
 }

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java Tue Sep  1 16:27:52 2009
@@ -79,7 +79,7 @@
 @MBeanDescription("Management Bean for an AMQ Broker Connection")
 public class AMQProtocolSessionMBean extends AMQManagedObject implements ManagedConnection
 {
-    private AMQMinaProtocolSession _session = null;
+    private AMQProtocolSession _protocolSession = null;
     private String _name = null;
 
     // openmbean data types for representing the channel attributes
@@ -92,10 +92,10 @@
         new AMQShortString("Broker Management Console has closed the connection.");
 
     @MBeanConstructor("Creates an MBean exposing an AMQ Broker Connection")
-    public AMQProtocolSessionMBean(AMQMinaProtocolSession session) throws NotCompliantMBeanException, OpenDataException
+    public AMQProtocolSessionMBean(AMQProtocolSession amqProtocolSession) throws NotCompliantMBeanException, OpenDataException
     {
         super(ManagedConnection.class, ManagedConnection.TYPE, ManagedConnection.VERSION);
-        _session = session;
+        _protocolSession = amqProtocolSession;
         String remote = getRemoteAddress();
         remote = "anonymous".equals(remote) ? (remote + hashCode()) : remote;
         _name = jmxEncode(new StringBuffer(remote), 0).toString();
@@ -128,52 +128,52 @@
 
     public String getClientId()
     {
-        return (_session.getContextKey() == null) ? null : _session.getContextKey().toString();
+        return (_protocolSession.getContextKey() == null) ? null : _protocolSession.getContextKey().toString();
     }
 
     public String getAuthorizedId()
     {
-        return (_session.getAuthorizedID() != null ) ? _session.getAuthorizedID().getName() : null;
+        return (_protocolSession.getAuthorizedID() != null ) ? _protocolSession.getAuthorizedID().getName() : null;
     }
 
     public String getVersion()
     {
-        return (_session.getClientVersion() == null) ? null : _session.getClientVersion().toString();
+        return (_protocolSession.getClientVersion() == null) ? null : _protocolSession.getClientVersion().toString();
     }
 
     public Date getLastIoTime()
     {
-        return new Date(_session.getIOSession().getLastIoTime());
+        return new Date(_protocolSession.getLastIoTime());
     }
 
     public String getRemoteAddress()
     {
-        return _session.getIOSession().getRemoteAddress().toString();
+        return _protocolSession.getRemoteAddress().toString();
     }
 
     public ManagedObject getParentObject()
     {
-        return _session.getVirtualHost().getManagedObject();
+        return _protocolSession.getVirtualHost().getManagedObject();
     }
 
     public Long getWrittenBytes()
     {
-        return _session.getIOSession().getWrittenBytes();
+        return _protocolSession.getWrittenBytes();
     }
 
     public Long getReadBytes()
     {
-        return _session.getIOSession().getReadBytes();
+        return _protocolSession.getWrittenBytes();
     }
 
     public Long getMaximumNumberOfChannels()
     {
-        return _session.getMaximumNumberOfChannels();
+        return _protocolSession.getMaximumNumberOfChannels();
     }
 
     public void setMaximumNumberOfChannels(Long value)
     {
-        _session.setMaximumNumberOfChannels(value);
+        _protocolSession.setMaximumNumberOfChannels(value);
     }
 
     public String getObjectInstanceName()
@@ -192,13 +192,13 @@
         CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger()));
         try
         {
-            AMQChannel channel = _session.getChannel(channelId);
+            AMQChannel channel = _protocolSession.getChannel(channelId);
             if (channel == null)
             {
                 throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
             }
 
-            _session.commitTransactions(channel);
+            _protocolSession.commitTransactions(channel);
         }
         catch (AMQException ex)
         {
@@ -221,13 +221,13 @@
         CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger()));
         try
         {
-            AMQChannel channel = _session.getChannel(channelId);
+            AMQChannel channel = _protocolSession.getChannel(channelId);
             if (channel == null)
             {
                 throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
             }
 
-            _session.rollbackTransactions(channel);
+            _protocolSession.commitTransactions(channel);
         }
         catch (AMQException ex)
         {
@@ -248,7 +248,7 @@
     public TabularData channels() throws OpenDataException
     {
         TabularDataSupport channelsList = new TabularDataSupport(_channelsType);
-        List<AMQChannel> list = _session.getChannels();
+        List<AMQChannel> list = _protocolSession.getChannels();
 
         for (AMQChannel channel : list)
         {
@@ -274,7 +274,7 @@
     public void closeConnection() throws JMException
     {
 
-        MethodRegistry methodRegistry = _session.getMethodRegistry();
+        MethodRegistry methodRegistry = _protocolSession.getMethodRegistry();
         ConnectionCloseBody responseBody =
                 methodRegistry.createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(),
                                                          // replyCode
@@ -301,12 +301,12 @@
 
         try
         {
-            _session.writeFrame(responseBody.generateFrame(0));
+            _protocolSession.writeFrame(responseBody.generateFrame(0));
 
             try
             {
 
-                _session.closeSession();
+                _protocolSession.closeSession();
             }
             catch (AMQException ex)
             {

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java Tue Sep  1 16:27:52 2009
@@ -258,7 +258,7 @@
             for (InetSocketAddress bindAddress : _acceptors.keySet())
             {
                 QpidAcceptor acceptor = _acceptors.get(bindAddress);
-                acceptor.getIoAcceptor().unbind(bindAddress);
+                acceptor.getNetworkDriver().close();
                 CurrentActor.get().message(BrokerMessages.BRK_1003(acceptor.toString(), bindAddress.getPort()));
             }
         }

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java Tue Sep  1 16:27:52 2009
@@ -23,7 +23,6 @@
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
-import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Pattern;
@@ -32,7 +31,6 @@
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.security.access.ACLPlugin;
 import org.apache.qpid.server.security.access.ACLPluginFactory;
@@ -180,13 +178,13 @@
     @Override
     public AuthzResult authoriseConnect(AMQProtocolSession session, VirtualHost virtualHost)
     {
-        if (!(session instanceof AMQMinaProtocolSession))
+        SocketAddress sockAddr = session.getRemoteAddress();
+        if (!(sockAddr instanceof InetSocketAddress))
         {
-            return AuthzResult.ABSTAIN; // We only deal with tcp sessions, which
-                                        // mean MINA right now
+            return AuthzResult.ABSTAIN; // We only deal with tcp sessions
         }
 
-        InetAddress addr = getInetAdressFromMinaSession((AMQMinaProtocolSession) session);
+        InetAddress addr = ((InetSocketAddress) sockAddr).getAddress();
 
         if (addr == null)
         {
@@ -213,19 +211,6 @@
 
     }
 
-    private InetAddress getInetAdressFromMinaSession(AMQMinaProtocolSession session)
-    {
-        SocketAddress remote = session.getIOSession().getRemoteAddress();
-        if (remote instanceof InetSocketAddress)
-        {
-            return ((InetSocketAddress) remote).getAddress();
-        }
-        else
-        {
-            return null;
-        }
-    }
-
     @Override
     public void setConfiguration(Configuration config) throws ConfigurationException
     {

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java Tue Sep  1 16:27:52 2009
@@ -20,21 +20,21 @@
  */
 package org.apache.qpid.server.transport;
 
-import org.apache.mina.common.IoAcceptor;
+import org.apache.qpid.transport.NetworkDriver;
 
 public class QpidAcceptor
 {
-    IoAcceptor _acceptor;
+    NetworkDriver _driver;
     String _protocol;
-    public QpidAcceptor(IoAcceptor acceptor, String protocol)
+    public QpidAcceptor(NetworkDriver driver, String protocol)
     {
-        _acceptor = acceptor;
+        _driver = driver;
         _protocol = protocol;
     }
 
-    public IoAcceptor getIoAcceptor()
+    public NetworkDriver getNetworkDriver()
     {
-        return _acceptor;
+        return _driver;
     }
 
     public String toString()

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java Tue Sep  1 16:27:52 2009
@@ -20,27 +20,27 @@
  */
 package org.apache.qpid.server.configuration;
 
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+
 import junit.framework.TestCase;
+
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
+import org.apache.qpid.server.protocol.AMQProtocolEngine;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.protocol.TestIoSession;
+import org.apache.qpid.server.protocol.TestNetworkDriver;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
 
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Locale;
-
 public class ServerConfigurationTest extends TestCase
 {
 
@@ -589,12 +589,12 @@
     {
         // Check default
         ServerConfiguration serverConfig = new ServerConfiguration(_config);
-        assertEquals(true, serverConfig.getSSLOnly());
+        assertEquals(false, serverConfig.getSSLOnly());
 
         // Check value we set
-        _config.setProperty("connector.ssl.sslOnly", false);
+        _config.setProperty("connector.ssl.sslOnly", true);
         serverConfig = new ServerConfiguration(_config);
-        assertEquals(false, serverConfig.getSSLOnly());
+        assertEquals(true, serverConfig.getSSLOnly());
     }
 
     public void testGetSSLPort() throws ConfigurationException
@@ -791,16 +791,15 @@
         // Test config
         VirtualHostRegistry virtualHostRegistry = reg.getVirtualHostRegistry();
         VirtualHost virtualHost = virtualHostRegistry.getVirtualHost("test");
-        AMQCodecFactory codecFactory = new AMQCodecFactory(true);
-
-        TestIoSession iosession = new TestIoSession();
-        iosession.setAddress("127.0.0.1");
         
-        AMQProtocolSession session = new AMQMinaProtocolSession(iosession, virtualHostRegistry, codecFactory);
+        TestNetworkDriver testDriver = new TestNetworkDriver();
+        testDriver.setAddress("127.0.0.1");
+        
+        AMQProtocolEngine session = new AMQProtocolEngine(virtualHostRegistry, testDriver);
         assertFalse(reg.getAccessManager().authoriseConnect(session, virtualHost));
         
-        iosession.setAddress("127.1.2.3");
-        session = new AMQMinaProtocolSession(iosession, virtualHostRegistry, codecFactory);
+        testDriver.setAddress("127.1.2.3");
+        session = new AMQProtocolEngine(virtualHostRegistry, testDriver);
         assertTrue(reg.getAccessManager().authoriseConnect(session, virtualHost));
     }
     
@@ -866,12 +865,12 @@
         // Test config
         VirtualHostRegistry virtualHostRegistry = reg.getVirtualHostRegistry();
         VirtualHost virtualHost = virtualHostRegistry.getVirtualHost("test");
-        AMQCodecFactory codecFactory = new AMQCodecFactory(true);
 
-        TestIoSession iosession = new TestIoSession();
-        iosession.setAddress("127.0.0.1");
+        TestNetworkDriver testDriver = new TestNetworkDriver();
+        testDriver.setAddress("127.0.0.1");
         
-        AMQProtocolSession session = new AMQMinaProtocolSession(iosession, virtualHostRegistry, codecFactory);
+        AMQProtocolEngine session = new AMQProtocolEngine(virtualHostRegistry, testDriver);
+        session.setNetworkDriver(testDriver);
         assertFalse(reg.getAccessManager().authoriseConnect(session, virtualHost));
     }
 
@@ -935,12 +934,11 @@
         ApplicationRegistry.initialise(reg, 1);
 
         // Test config
-        TestIoSession iosession = new TestIoSession();
-        iosession.setAddress("127.0.0.1");
+        TestNetworkDriver testDriver = new TestNetworkDriver();
+        testDriver.setAddress("127.0.0.1");
         VirtualHostRegistry virtualHostRegistry = reg.getVirtualHostRegistry();
         VirtualHost virtualHost = virtualHostRegistry.getVirtualHost("test");
-        AMQCodecFactory codecFactory = new AMQCodecFactory(true);
-        AMQProtocolSession session = new AMQMinaProtocolSession(iosession, virtualHostRegistry, codecFactory);
+        AMQProtocolSession session = new AMQProtocolEngine(virtualHostRegistry, testDriver);
         assertFalse(reg.getAccessManager().authoriseConnect(session, virtualHost));
 
         RandomAccessFile fileBRandom = new RandomAccessFile(fileB, "rw");

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java Tue Sep  1 16:27:52 2009
@@ -44,7 +44,7 @@
     private static final Logger log = Logger.getLogger(AMQProtocolSessionMBeanTest.class);
 
     private MessageStore _messageStore = new SkeletonMessageStore();
-    private AMQMinaProtocolSession _protocolSession;
+    private AMQProtocolEngine _protocolSession;
     private AMQChannel _channel;
     private AMQProtocolSessionMBean _mbean;
 

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java Tue Sep  1 16:27:52 2009
@@ -20,23 +20,22 @@
  */
 package org.apache.qpid.server.protocol;
 
-import org.apache.qpid.AMQException;
-import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.output.ProtocolOutputConverter;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
+import java.security.Principal;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.security.Principal;
 
-public class InternalTestProtocolSession extends AMQMinaProtocolSession implements ProtocolOutputConverter
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class InternalTestProtocolSession extends AMQProtocolEngine implements ProtocolOutputConverter
 {
     // ChannelID(LIST)  -> LinkedList<Pair>
     final Map<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>> _channelDelivers;
@@ -44,9 +43,7 @@
 
     public InternalTestProtocolSession(VirtualHost virtualHost) throws AMQException
     {
-        super(new TestIoSession(),
-              ApplicationRegistry.getInstance().getVirtualHostRegistry(),
-              new AMQCodecFactory(true));
+        super(ApplicationRegistry.getInstance().getVirtualHostRegistry(), new TestNetworkDriver());
 
         _channelDelivers = new HashMap<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>>();
 

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java Tue Sep  1 16:27:52 2009
@@ -37,7 +37,7 @@
 /** Test class to test MBean operations for AMQMinaProtocolSession. */
 public class MaxChannelsTest extends TestCase
 {
-	private AMQMinaProtocolSession _session;
+	private AMQProtocolEngine _session;
 
     public void testChannels() throws Exception
     {

Added: qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestNetworkDriver.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestNetworkDriver.java?rev=810110&view=auto
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestNetworkDriver.java (added)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestNetworkDriver.java Tue Sep  1 16:27:52 2009
@@ -0,0 +1,126 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import javax.net.ssl.SSLEngine;
+
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.transport.NetworkDriverConfiguration;
+import org.apache.qpid.transport.OpenException;
+
+/**
+ * Test implementation of IoSession, which is required for some tests. Methods not being used are not implemented,
+ * so if this class is being used and some methods are to be used, then please update those.
+ */
+public class TestNetworkDriver implements NetworkDriver
+{
+    private final ConcurrentMap attributes = new ConcurrentHashMap();
+    private String _address = "127.0.0.1";
+    private int _port = 1;
+
+    public TestNetworkDriver()
+    {
+    }
+
+    public void setAddress(String string)
+    {
+        this._address = string;
+    }
+
+    public String getAddress()
+    {
+        return _address;
+    }
+
+    public void setPort(int _port)
+    {
+        this._port = _port;
+    }
+
+    public int getPort()
+    {
+        return _port;
+    }
+
+    public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory,
+            NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException
+    {
+        
+    }
+
+    public SocketAddress getLocalAddress()
+    {
+        return new InetSocketAddress(_address, _port);
+    }
+
+    public SocketAddress getRemoteAddress()
+    {
+        return new InetSocketAddress(_address, _port);
+    }
+
+    public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config,
+            SSLEngine sslEngine) throws OpenException
+    {
+        
+    }
+
+    public void setMaxReadIdle(int idleTime)
+    {
+        
+    }
+
+    public void setMaxWriteIdle(int idleTime)
+    {
+        
+    }
+
+    public void close()
+    {
+           
+    }
+
+    public void flush()
+    {
+        
+    }
+
+    public void send(ByteBuffer msg)
+    {
+        
+    }
+
+    public void setIdleTimeout(long l)
+    {
+        
+    }
+
+}

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java Tue Sep  1 16:27:52 2009
@@ -20,37 +20,34 @@
  */
 package org.apache.qpid.server.queue;
 
+import java.util.ArrayList;
+import java.util.LinkedList;
+
+import javax.management.Notification;
+
 import junit.framework.TestCase;
+
+import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MemoryMessageStore;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.RequiredDeliveryException;
 import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.protocol.AMQProtocolEngine;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
-import org.apache.qpid.server.protocol.InternalTestProtocolSession;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.commons.configuration.CompositeConfiguration;
-import org.apache.mina.common.ByteBuffer;
-
-import javax.management.Notification;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.Collections;
-import java.util.Set;
-import java.security.Principal;
+import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.virtualhost.VirtualHost;
 
 /** This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters */
 public class AMQQueueAlertTest extends TestCase
@@ -62,7 +59,7 @@
     private AMQQueue _queue;
     private AMQQueueMBean _queueMBean;
     private VirtualHost _virtualHost;
-    private AMQMinaProtocolSession _protocolSession;
+    private AMQProtocolEngine _protocolSession;
     private MessageStore _messageStore = new MemoryMessageStore();
     private StoreContext _storeContext = new StoreContext();
     private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext,

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java Tue Sep  1 16:27:52 2009
@@ -30,17 +30,14 @@
 import junit.framework.TestCase;
 
 import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
-import org.apache.qpid.server.protocol.TestIoSession;
+import org.apache.qpid.server.protocol.AMQProtocolEngine;
+import org.apache.qpid.server.protocol.TestNetworkDriver;
+import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult;
 import org.apache.qpid.server.store.TestableMemoryMessageStore;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-import org.apache.qpid.server.registry.ApplicationRegistry;
 
 public class FirewallPluginTest extends TestCase
 {
@@ -84,22 +81,22 @@
 
     private TestableMemoryMessageStore _store;
     private VirtualHost _virtualHost;
-    private AMQMinaProtocolSession _session;
+    private AMQProtocolEngine _session;
+    private TestNetworkDriver _testDriver;
 
     @Override
     public void setUp() throws Exception
     {
         super.setUp();
         _store = new TestableMemoryMessageStore();
-        TestIoSession iosession = new TestIoSession();
-        iosession.setAddress("127.0.0.1");
+        _testDriver = new TestNetworkDriver();
+        _testDriver.setAddress("127.0.0.1");
 
         // Retreive VirtualHost from the Registry
         VirtualHostRegistry virtualHostRegistry = ApplicationRegistry.getInstance().getVirtualHostRegistry();
         _virtualHost = virtualHostRegistry.getVirtualHost("test");
 
-        AMQCodecFactory codecFactory = new AMQCodecFactory(true);
-        _session = new AMQMinaProtocolSession(iosession, virtualHostRegistry, codecFactory);        
+        _session = new AMQProtocolEngine(virtualHostRegistry, _testDriver);
     }
 
     public void tearDown() throws Exception
@@ -170,7 +167,7 @@
         assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
         
         // Set session IP so that we're connected from the right address
-        ((TestIoSession) _session.getIOSession()).setAddress("192.168.23.23");
+        _testDriver.setAddress("192.168.23.23");
         assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
     }
     
@@ -185,7 +182,7 @@
         assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
         
         // Set session IP so that we're connected from the right address
-        ((TestIoSession) _session.getIOSession()).setAddress("192.168.23.23");
+        _testDriver.setAddress("192.168.23.23");
         assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
     }
 
@@ -198,7 +195,7 @@
         FirewallPlugin plugin = initialisePlugin("deny", new RuleInfo[]{rule});
 
         // Set session IP so that we're connected from the right address
-        ((TestIoSession) _session.getIOSession()).setAddress("127.0.0.1");
+        _testDriver.setAddress("127.0.0.1");
         assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
     }
 
@@ -211,7 +208,7 @@
         FirewallPlugin plugin = initialisePlugin("deny", new RuleInfo[]{rule});
 
         // Set session IP so that we're connected from the right address
-        ((TestIoSession) _session.getIOSession()).setAddress("127.0.0.1");
+        _testDriver.setAddress("127.0.0.1");
         assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
     }
     
@@ -234,7 +231,7 @@
         assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
         
         // Set session IP so that we're connected from the right address
-        ((TestIoSession) _session.getIOSession()).setAddress("192.168.23.23");
+        _testDriver.setAddress("192.168.23.23");
         assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
     }
     
@@ -257,7 +254,7 @@
         assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
         
         // Set session IP so that we're connected from the right address
-        ((TestIoSession) _session.getIOSession()).setAddress("192.168.23.23");
+        _testDriver.setAddress("192.168.23.23");
         assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
     }
 
@@ -271,7 +268,7 @@
         assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
         
         // Set session IP so that we're connected from the right address
-        ((TestIoSession) _session.getIOSession()).setAddress("192.168.23.23");
+        _testDriver.setAddress("192.168.23.23");
         assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
     }
     
@@ -285,7 +282,7 @@
         assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
         
         // Set session IP so that we're connected from the right address
-        ((TestIoSession) _session.getIOSession()).setAddress("192.168.23.23");
+        _testDriver.setAddress("192.168.23.23");
         assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
     }
     
@@ -295,11 +292,11 @@
         firstRule.setAccess("allow");
         firstRule.setHostname("foo, bar, "+new InetSocketAddress("127.0.0.1", 5672).getHostName());
         FirewallPlugin plugin = initialisePlugin("deny", new RuleInfo[]{firstRule});
-        ((TestIoSession) _session.getIOSession()).setAddress("10.0.0.1");
+        _testDriver.setAddress("10.0.0.1");
         assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
         
         // Set session IP so that we're connected from the right address
-        ((TestIoSession) _session.getIOSession()).setAddress("127.0.0.1");
+        _testDriver.setAddress("127.0.0.1");
         assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
     }
     

Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Tue Sep  1 16:27:52 2009
@@ -191,7 +191,7 @@
         _logger.debug("Protocol session created for session " + System.identityHashCode(session));
         _failoverHandler = new FailoverHandler(this, session);
 
-        final ProtocolCodecFilter pcf = new ProtocolCodecFilter(new AMQCodecFactory(false));
+        final ProtocolCodecFilter pcf = new ProtocolCodecFilter(new AMQCodecFactory(false, _protocolSession));
 
         if (Boolean.getBoolean("amqj.shared_read_write_pool"))
         {

Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java Tue Sep  1 16:27:52 2009
@@ -31,7 +31,10 @@
 import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
 import org.apache.qpid.jms.BrokerDetails;
 import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
 import org.apache.qpid.thread.QpidThreadExecutor;
+import org.apache.qpid.transport.network.mina.MINANetworkDriver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -62,7 +65,7 @@
 
     private static Logger _logger = LoggerFactory.getLogger(TransportConnection.class);
 
-    private static final String DEFAULT_QPID_SERVER = "org.apache.qpid.server.protocol.AMQPFastProtocolHandler";
+    private static final String DEFAULT_QPID_SERVER = "org.apache.qpid.server.protocol.AMQProtocolEngineFactory";
 
     private static Map<String, Socket> _openSocketRegister = new ConcurrentHashMap<String, Socket>();
 
@@ -190,8 +193,6 @@
             _acceptor = new VmPipeAcceptor();
 
             IoServiceConfig config = _acceptor.getDefaultConfig();
-
-            config.setThreadModel(ReadWriteThreadModel.getInstance());
         }
         synchronized (_inVmPipeAddress)
         {
@@ -276,7 +277,10 @@
         {
             Class[] cnstr = {Integer.class};
             Object[] params = {port};
-            provider = (IoHandlerAdapter) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params);
+            
+            provider = new MINANetworkDriver();
+            ProtocolEngineFactory engineFactory = (ProtocolEngineFactory) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params);
+            ((MINANetworkDriver) provider).setProtocolEngineFactory(engineFactory, true);
             // Give the broker a second to create
             _logger.info("Created VMBroker Instance:" + port);
         }

Modified: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java Tue Sep  1 16:27:52 2009
@@ -23,6 +23,7 @@
 import org.apache.mina.filter.codec.ProtocolCodecFactory;
 import org.apache.mina.filter.codec.ProtocolDecoder;
 import org.apache.mina.filter.codec.ProtocolEncoder;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 
 /**
  * AMQCodecFactory is a Mina codec factory. It supplies the encoders and decoders need to read and write the bytes to
@@ -50,9 +51,9 @@
      * @param expectProtocolInitiation <tt>true</tt> if the first frame received is going to be a protocol initiation
      *                                 frame, <tt>false</tt> if it is going to be a standard AMQ data block.
      */
-    public AMQCodecFactory(boolean expectProtocolInitiation)
+    public AMQCodecFactory(boolean expectProtocolInitiation, AMQVersionAwareProtocolSession session)
     {
-        _frameDecoder = new AMQDecoder(expectProtocolInitiation);
+        _frameDecoder = new AMQDecoder(expectProtocolInitiation, session);
     }
 
     /**
@@ -70,7 +71,7 @@
      *
      * @return The AMQP decoder.
      */
-    public ProtocolDecoder getDecoder()
+    public AMQDecoder getDecoder()
     {
         return _frameDecoder;
     }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org