You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2009/09/16 12:06:56 UTC

svn commit: r815704 [1/2] - in /qpid/branches/java-network-refactor/qpid/java: broker/ broker/src/main/java/org/apache/qpid/server/protocol/ broker/src/test/java/org/apache/qpid/server/configuration/ broker/src/test/java/org/apache/qpid/server/protocol...

Author: aidan
Date: Wed Sep 16 10:06:55 2009
New Revision: 815704

URL: http://svn.apache.org/viewvc?rev=815704&view=rev
Log:
QPID-2105: Make NetworkDriver.open use a SSLContextFactory, not an SSLEngine.

Allow an existing SocketConnector to be passed into a MINANetworkDriver, for
use with the ExistingSocket bit of TransportConnection.

Move the ExistingSocket stuff to one place, use MINANetworkDriver in
TransportConnection and make AMQProtocolHandler implement ProtocolEngine. Remove MINA specific gubbins from AMQProtocolHandler and AMQProtocolSession.

Move fireAsynchEvent to Job

Add getLocalAddress to AMQProtocolEngine

Move TestNetworkDriver to common

Use correct class for logger in AMQProtocolEngine

Check the exception is thrown properly in SimpleACLTest, make it a little less
prone to obscure race conditions.


Added:
    qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java
      - copied, changed from r815682, qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestNetworkDriver.java
Removed:
    qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestNetworkDriver.java
Modified:
    qpid/branches/java-network-refactor/qpid/java/broker/build.xml
    qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
    qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
    qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
    qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java
    qpid/branches/java-network-refactor/qpid/java/client/build.xml
    qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
    qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
    qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
    qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
    qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
    qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
    qpid/branches/java-network-refactor/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
    qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
    qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
    qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java
    qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java
    qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
    qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java
    qpid/branches/java-network-refactor/qpid/java/systests/build.xml
    qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
    qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java

Modified: qpid/branches/java-network-refactor/qpid/java/broker/build.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/build.xml?rev=815704&r1=815703&r2=815704&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/build.xml (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/build.xml Wed Sep 16 10:06:55 2009
@@ -21,6 +21,7 @@
 <project name="AMQ Broker" default="build">
 
     <property name="module.depends" value="management/common common"/>
+    <property name="module.test.depends" value="common/test" />
     <property name="module.main" value="org.apache.qpid.server.Main"/>
 
     <import file="../module.xml"/>

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=815704&r1=815703&r2=815704&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Wed Sep 16 10:06:55 2009
@@ -31,8 +31,6 @@
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.management.JMException;
@@ -51,7 +49,6 @@
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.framing.AMQProtocolHeaderException;
-import org.apache.qpid.framing.AMQProtocolVersionException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ChannelCloseOkBody;
 import org.apache.qpid.framing.ConnectionCloseBody;
@@ -94,7 +91,7 @@
 
 public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocolSession
 {
-    private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class);
+    private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
 
     private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
 
@@ -180,6 +177,7 @@
 
         _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
         _actor.message(ConnectionMessages.CON_1001(null, null, false, false));
+        _poolReference.acquireExecutorService();
     }
 
     private AMQProtocolSessionMBean createMBean() throws AMQException
@@ -212,7 +210,7 @@
         try
         {
             final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
-            fireAsynchEvent(_readJob, new Event(new Runnable()
+            Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Event(new Runnable()
             {
                 @Override
                 public void run()
@@ -463,7 +461,7 @@
         final ByteBuffer buf = frame.toNioByteBuffer();
         _lastIoTime = System.currentTimeMillis();
         _writtenBytes += buf.remaining();
-        fireAsynchEvent(_writeJob, new Event(new Runnable()
+        Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Event(new Runnable()
         {
             @Override
             public void run()
@@ -687,6 +685,10 @@
     /** This must be called when the session is _closed in order to free up any resources managed by the session. */
     public void closeSession() throws AMQException
     {
+        if (CurrentActor.get() == null)
+        {
+            CurrentActor.set(_actor);
+        }
         if (!_closed)
         {
             if (_virtualHost != null)
@@ -907,6 +909,11 @@
     public SocketAddress getRemoteAddress()
     {
         return _networkDriver.getRemoteAddress();
+    }    
+
+    public SocketAddress getLocalAddress()
+    {
+        return _networkDriver.getLocalAddress();
     }
 
     public MethodRegistry getMethodRegistry()
@@ -990,7 +997,7 @@
     {
         // Do nothing
     }
-
+    
     @Override
     public long getReadBytes()
     {
@@ -1017,38 +1024,6 @@
         return (_clientVersion == null) ? null : _clientVersion.toString();
     }
     
-    /**
-     * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running.
-     *
-     * @param job The job.
-     * @param event   The event to hand off asynchronously.
-     */
-    void fireAsynchEvent(Job job, Event event)
-    {
-
-        job.add(event);
-
-        final ExecutorService pool = _poolReference .getPool();
-
-        if(pool == null)
-        {
-            return;
-        }
-
-        // rather than perform additional checks on pool to check that it hasn't shutdown.
-        // catch the RejectedExecutionException that will result from executing on a shutdown pool
-        if (job.activate())
-        {
-            try
-            {
-                pool.execute(job);
-            }
-            catch(RejectedExecutionException e)
-            {
-                _logger.warn("Thread pool shutdown while tasks still outstanding");
-            }
-        }
-
-    }
+    
     
 }

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java?rev=815704&r1=815703&r2=815704&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java Wed Sep 16 10:06:55 2009
@@ -35,11 +35,11 @@
 import org.apache.commons.configuration.XMLConfiguration;
 import org.apache.qpid.server.protocol.AMQProtocolEngine;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.protocol.TestNetworkDriver;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.transport.TestNetworkDriver;
 
 public class ServerConfigurationTest extends TestCase
 {
@@ -793,12 +793,12 @@
         VirtualHost virtualHost = virtualHostRegistry.getVirtualHost("test");
         
         TestNetworkDriver testDriver = new TestNetworkDriver();
-        testDriver.setAddress("127.0.0.1");
+        testDriver.setRemoteAddress("127.0.0.1");
         
         AMQProtocolEngine session = new AMQProtocolEngine(virtualHostRegistry, testDriver);
         assertFalse(reg.getAccessManager().authoriseConnect(session, virtualHost));
         
-        testDriver.setAddress("127.1.2.3");
+        testDriver.setRemoteAddress("127.1.2.3");
         session = new AMQProtocolEngine(virtualHostRegistry, testDriver);
         assertTrue(reg.getAccessManager().authoriseConnect(session, virtualHost));
     }
@@ -867,7 +867,7 @@
         VirtualHost virtualHost = virtualHostRegistry.getVirtualHost("test");
 
         TestNetworkDriver testDriver = new TestNetworkDriver();
-        testDriver.setAddress("127.0.0.1");
+        testDriver.setRemoteAddress("127.0.0.1");
         
         AMQProtocolEngine session = new AMQProtocolEngine(virtualHostRegistry, testDriver);
         session.setNetworkDriver(testDriver);
@@ -935,7 +935,7 @@
 
         // Test config
         TestNetworkDriver testDriver = new TestNetworkDriver();
-        testDriver.setAddress("127.0.0.1");
+        testDriver.setRemoteAddress("127.0.0.1");
         VirtualHostRegistry virtualHostRegistry = reg.getVirtualHostRegistry();
         VirtualHost virtualHost = virtualHostRegistry.getVirtualHost("test");
         AMQProtocolSession session = new AMQProtocolEngine(virtualHostRegistry, testDriver);

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java?rev=815704&r1=815703&r2=815704&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java Wed Sep 16 10:06:55 2009
@@ -34,6 +34,7 @@
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.TestNetworkDriver;
 
 public class InternalTestProtocolSession extends AMQProtocolEngine implements ProtocolOutputConverter
 {

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java?rev=815704&r1=815703&r2=815704&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java Wed Sep 16 10:06:55 2009
@@ -32,12 +32,12 @@
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.XMLConfiguration;
 import org.apache.qpid.server.protocol.AMQProtocolEngine;
-import org.apache.qpid.server.protocol.TestNetworkDriver;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult;
 import org.apache.qpid.server.store.TestableMemoryMessageStore;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.transport.TestNetworkDriver;
 
 public class FirewallPluginTest extends TestCase
 {
@@ -90,7 +90,7 @@
         super.setUp();
         _store = new TestableMemoryMessageStore();
         _testDriver = new TestNetworkDriver();
-        _testDriver.setAddress("127.0.0.1");
+        _testDriver.setRemoteAddress("127.0.0.1");
 
         // Retreive VirtualHost from the Registry
         VirtualHostRegistry virtualHostRegistry = ApplicationRegistry.getInstance().getVirtualHostRegistry();
@@ -167,7 +167,7 @@
         assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
         
         // Set session IP so that we're connected from the right address
-        _testDriver.setAddress("192.168.23.23");
+        _testDriver.setRemoteAddress("192.168.23.23");
         assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
     }
     
@@ -182,7 +182,7 @@
         assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
         
         // Set session IP so that we're connected from the right address
-        _testDriver.setAddress("192.168.23.23");
+        _testDriver.setRemoteAddress("192.168.23.23");
         assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
     }
 
@@ -195,7 +195,7 @@
         FirewallPlugin plugin = initialisePlugin("deny", new RuleInfo[]{rule});
 
         // Set session IP so that we're connected from the right address
-        _testDriver.setAddress("127.0.0.1");
+        _testDriver.setRemoteAddress("127.0.0.1");
         assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
     }
 
@@ -208,7 +208,7 @@
         FirewallPlugin plugin = initialisePlugin("deny", new RuleInfo[]{rule});
 
         // Set session IP so that we're connected from the right address
-        _testDriver.setAddress("127.0.0.1");
+        _testDriver.setRemoteAddress("127.0.0.1");
         assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
     }
     
@@ -231,7 +231,7 @@
         assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
         
         // Set session IP so that we're connected from the right address
-        _testDriver.setAddress("192.168.23.23");
+        _testDriver.setRemoteAddress("192.168.23.23");
         assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
     }
     
@@ -254,7 +254,7 @@
         assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
         
         // Set session IP so that we're connected from the right address
-        _testDriver.setAddress("192.168.23.23");
+        _testDriver.setRemoteAddress("192.168.23.23");
         assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
     }
 
@@ -268,7 +268,7 @@
         assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
         
         // Set session IP so that we're connected from the right address
-        _testDriver.setAddress("192.168.23.23");
+        _testDriver.setRemoteAddress("192.168.23.23");
         assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
     }
     
@@ -282,7 +282,7 @@
         assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
         
         // Set session IP so that we're connected from the right address
-        _testDriver.setAddress("192.168.23.23");
+        _testDriver.setRemoteAddress("192.168.23.23");
         assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
     }
     
@@ -292,11 +292,11 @@
         firstRule.setAccess("allow");
         firstRule.setHostname("foo, bar, "+new InetSocketAddress("127.0.0.1", 5672).getHostName());
         FirewallPlugin plugin = initialisePlugin("deny", new RuleInfo[]{firstRule});
-        _testDriver.setAddress("10.0.0.1");
+        _testDriver.setRemoteAddress("10.0.0.1");
         assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
         
         // Set session IP so that we're connected from the right address
-        _testDriver.setAddress("127.0.0.1");
+        _testDriver.setRemoteAddress("127.0.0.1");
         assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
     }
     

Modified: qpid/branches/java-network-refactor/qpid/java/client/build.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/build.xml?rev=815704&r1=815703&r2=815704&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/build.xml (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/build.xml Wed Sep 16 10:06:55 2009
@@ -21,6 +21,7 @@
 <project name="AMQ Client" default="build">
 
   <property name="module.depends" value="common"/>
+  <property name="module.test.depends" value="common/test" />
   <property name="module.genpom" value="true"/>
   <property name="module.genpom.args" value="-Sgeronimo-jms_1.1_spec=provided"/>
 

Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=815704&r1=815703&r2=815704&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Wed Sep 16 10:06:55 2009
@@ -97,7 +97,7 @@
         {
             _conn.getProtocolHandler().createIoTransportSession(brokerDetail);
         }
-        
+        _conn._protocolHandler.getProtocolSession().init();
         // this blocks until the connection has been set up or when an error
         // has prevented the connection being set up
 

Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java?rev=815704&r1=815703&r2=815704&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java Wed Sep 16 10:06:55 2009
@@ -20,8 +20,6 @@
  */
 package org.apache.qpid.client.failover;
 
-import org.apache.mina.common.IoSession;
-
 import org.apache.qpid.AMQDisconnectedException;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.state.AMQStateManager;
@@ -81,9 +79,6 @@
     /** Used for debugging. */
     private static final Logger _logger = LoggerFactory.getLogger(FailoverHandler.class);
 
-    /** Holds the MINA session for the connection that has failed, not the connection that is being failed onto. */
-    private final IoSession _session;
-
     /** Holds the protocol handler for the failed connection, upon which the new connection is to be set up. */
     private AMQProtocolHandler _amqProtocolHandler;
 
@@ -99,10 +94,9 @@
      * @param amqProtocolHandler The protocol handler that spans the failover.
      * @param session            The MINA session, for the failing connection.
      */
-    public FailoverHandler(AMQProtocolHandler amqProtocolHandler, IoSession session)
+    public FailoverHandler(AMQProtocolHandler amqProtocolHandler)
     {
         _amqProtocolHandler = amqProtocolHandler;
-        _session = session;
     }
 
     /**
@@ -221,7 +215,7 @@
                     _amqProtocolHandler.setFailoverState(FailoverState.FAILED);
                     /*try
                     {*/
-                    _amqProtocolHandler.exceptionCaught(_session, e);
+                    _amqProtocolHandler.exception(e);
                     /*}
                     catch (Exception ex)
                     {

Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=815704&r1=815703&r2=815704&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Wed Sep 16 10:06:55 2009
@@ -20,10 +20,6 @@
  */
 package org.apache.qpid.client.protocol;
 
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoFilterChain;
-import org.apache.mina.common.IoHandlerAdapter;
-import org.apache.mina.common.IoSession;
 import org.apache.mina.filter.ReadThrottleFilterBuilder;
 import org.apache.mina.filter.SSLFilter;
 import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
@@ -48,16 +44,25 @@
 import org.apache.qpid.codec.AMQCodecFactory;
 import org.apache.qpid.framing.*;
 import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.pool.Event;
+import org.apache.qpid.pool.Job;
+import org.apache.qpid.pool.PoolingFilter;
 import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.pool.ReferenceCountingExecutorService;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
+import org.apache.qpid.protocol.ProtocolEngine;
 import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.NetworkDriver;
 import org.apache.qpid.transport.network.io.IoTransport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
@@ -120,7 +125,7 @@
  * held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so
  * that lifecycles of the fields match lifecycles of their containing objects.
  */
-public class AMQProtocolHandler extends IoHandlerAdapter
+public class AMQProtocolHandler implements ProtocolEngine
 {
     /** Used for debugging. */
     private static final Logger _logger = LoggerFactory.getLogger(AMQProtocolHandler.class);
@@ -137,7 +142,7 @@
     private volatile AMQProtocolSession _protocolSession;
 
     /** Holds the state of the protocol session. */
-    private AMQStateManager _stateManager = new AMQStateManager();
+    private AMQStateManager _stateManager;
 
     /** Holds the method listeners, */
     private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>();
@@ -166,7 +171,15 @@
 
     /** Object to lock on when changing the latch */
     private Object _failoverLatchChange = new Object();
-
+    private AMQCodecFactory _codecFactory;
+    private Job _readJob;
+    private Job _writeJob;
+    private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
+    private NetworkDriver _networkDriver;
+    
+    private long _writtenBytes;
+    private long _readBytes;
+    
     /**
      * Creates a new protocol handler, associated with the specified client connection instance.
      *
@@ -175,86 +188,14 @@
     public AMQProtocolHandler(AMQConnection con)
     {
         _connection = con;
-    }
-
-    /**
-     * Invoked by MINA when a MINA session for a new connection is created. This method sets up the filter chain on the
-     * session, which filters the events handled by this handler. The filter chain consists of, handing off events
-     * to an asynchronous thread pool, optionally encoding/decoding ssl, encoding/decoding AMQP.
-     *
-     * @param session The MINA session.
-     *
-     * @throws Exception Any underlying exceptions are allowed to fall through to MINA.
-     */
-    public void sessionCreated(IoSession session) throws Exception
-    {
-        _logger.debug("Protocol session created for session " + System.identityHashCode(session));
-        _failoverHandler = new FailoverHandler(this, session);
-
-        final ProtocolCodecFilter pcf = new ProtocolCodecFilter(new AMQCodecFactory(false, _protocolSession));
-
-        if (Boolean.getBoolean("amqj.shared_read_write_pool"))
-        {
-            session.getFilterChain().addBefore("AsynchronousWriteFilter", "protocolFilter", pcf);
-        }
-        else
-        {
-            session.getFilterChain().addLast("protocolFilter", pcf);
-        }
-        // we only add the SSL filter where we have an SSL connection
-        if (_connection.getSSLConfiguration() != null)
-        {
-            SSLConfiguration sslConfig = _connection.getSSLConfiguration();
-            SSLContextFactory sslFactory =
-                    new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType());
-            SSLFilter sslFilter = new SSLFilter(sslFactory.buildClientContext());
-            sslFilter.setUseClientMode(true);
-            session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter);
-        }
-
-        try
-        {
-            ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance();
-            threadModel.getAsynchronousReadFilter().createNewJobForSession(session);
-            threadModel.getAsynchronousWriteFilter().createNewJobForSession(session);
-        }
-        catch (RuntimeException e)
-        {
-            _logger.error(e.getMessage(), e);
-        }
-
-        if (Boolean.getBoolean(ClientProperties.PROTECTIO_PROP_NAME))
-        {
-            try
-            {
-                //Add IO Protection Filters
-                IoFilterChain chain = session.getFilterChain();
-
-                session.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
-
-                ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
-                readfilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty(
-                        ClientProperties.READ_BUFFER_LIMIT_PROP_NAME, ClientProperties.READ_BUFFER_LIMIT_DEFAULT)));
-                readfilter.attach(chain);
-
-                WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
-                writefilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty(
-                        ClientProperties.WRITE_BUFFER_LIMIT_PROP_NAME, ClientProperties.WRITE_BUFFER_LIMIT_DEFAULT)));
-                writefilter.attach(chain);
-                session.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
-
-                _logger.info("Using IO Read/Write Filter Protection");
-            }
-            catch (Exception e)
-            {
-                _logger.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage());
-            }
-        }
-        _protocolSession = new AMQProtocolSession(this, session, _connection);
-
-        _stateManager.setProtocolSession(_protocolSession);
-
-        _protocolSession.init();
+        _protocolSession = new AMQProtocolSession(this, _connection);
+        _stateManager = new AMQStateManager(_protocolSession);
+        _codecFactory = new AMQCodecFactory(false, _protocolSession);
+        ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance();
+        _readJob = new Job(threadModel.getAsynchronousReadFilter(), PoolingFilter.MAX_JOB_EVENTS, true);
+        _writeJob = new Job(threadModel.getAsynchronousWriteFilter(), PoolingFilter.MAX_JOB_EVENTS, false);
+        _poolReference.acquireExecutorService();
+        _failoverHandler = new FailoverHandler(this);
     }
 
     /**
@@ -283,12 +224,10 @@
      * may be called first followed by this method. This depends on whether the client was trying to send data at the
      * time of the failure.
      *
-     * @param session The MINA session.
-     *
      * @todo Clarify: presumably exceptionCaught is called when the client is sending during a connection failure and
      * not otherwise? The above comment doesn't make that clear.
      */
-    public void sessionClosed(IoSession session)
+    public void closed()
     {
         if (_connection.isClosed())
         {
@@ -327,7 +266,8 @@
                 {
                     _logger.debug("sessionClose() not allowed to failover");
                     _connection.exceptionReceived(new AMQDisconnectedException(
-                            "Server closed connection and reconnection " + "not permitted.", null));
+                            "Server closed connection and reconnection " + "not permitted.", 
+                            _stateManager.getLastException()));
                 }
                 else
                 {
@@ -350,43 +290,39 @@
         failoverThread.start();
     }
 
-    public void sessionIdle(IoSession session, IdleStatus status) throws Exception
+    @Override
+    public void readerIdle()
     {
-        _logger.debug("Protocol Session [" + this + ":" + session + "] idle: " + status);
-        if (IdleStatus.WRITER_IDLE.equals(status))
-        {
-            // write heartbeat frame:
-            _logger.debug("Sent heartbeat");
-            session.write(HeartbeatBody.FRAME);
-            HeartbeatDiagnostics.sent();
-        }
-        else if (IdleStatus.READER_IDLE.equals(status))
-        {
-            // failover:
-            HeartbeatDiagnostics.timeout();
-            _logger.warn("Timed out while waiting for heartbeat from peer.");
-            session.close();
-        }
+        _logger.debug("Protocol Session [" + this + "] idle: reader");
+        //  failover:
+        HeartbeatDiagnostics.timeout();
+        _logger.warn("Timed out while waiting for heartbeat from peer.");
+        _networkDriver.close();
+    }
+    
+    @Override
+    public void writerIdle()
+    {
+        _logger.debug("Protocol Session [" + this + "] idle: reader");
+        writeFrame(HeartbeatBody.FRAME);
+        HeartbeatDiagnostics.sent();
     }
 
     /**
-     * Invoked when any exception is thrown by a user IoHandler implementation or by MINA. If the cause is an
-     * IOException, MINA will close the connection automatically.
-     *
-     * @param session The MINA session.
-     * @param cause   The exception that triggered this event.
+     * Invoked when any exception is thrown by the NetworkDriver
      */
-    public void exceptionCaught(IoSession session, Throwable cause)
+    public void exception(Throwable cause)
     {
+        _logger.info("AS: HELLO");
         if (_failoverState == FailoverState.NOT_STARTED)
         {
             // if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException)))
             if ((cause instanceof AMQConnectionClosedException) || cause instanceof IOException)
             {
                 _logger.info("Exception caught therefore going to attempt failover: " + cause, cause);
-                // this will attemp failover
-
-                sessionClosed(session);
+                // this will attempt failover
+                _networkDriver.close();
+                closed();
             }
             else
             {
@@ -437,6 +373,7 @@
     public void propagateExceptionToAllWaiters(Exception e)
     {
         getStateManager().error(e);
+        
         propagateExceptionToFrameListeners(e);
     }
 
@@ -490,48 +427,84 @@
 
     private static int _messageReceivedCount;
 
-    public void messageReceived(IoSession session, Object message) throws Exception
-    {
-        if (PROTOCOL_DEBUG)
-        {
-            _protocolLogger.info(String.format("RECV: [%s] %s", this, message));
-        }
 
-        if(message instanceof AMQFrame)
+    @Override
+    public void received(ByteBuffer msg)
+    {
+        try
         {
-            final boolean debug = _logger.isDebugEnabled();
-            final long msgNumber = ++_messageReceivedCount;
+            _readBytes += msg.remaining();
+            final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
 
-            if (debug && ((msgNumber % 1000) == 0))
+            Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Event(new Runnable()
             {
-                _logger.debug("Received " + _messageReceivedCount + " protocol messages");
-            }
-
-            AMQFrame frame = (AMQFrame) message;
-
-            final AMQBody bodyFrame = frame.getBodyFrame();
-
-            HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
+                @Override
+                public void run()
+                {
+                    // Decode buffer
 
-            bodyFrame.handle(frame.getChannel(), _protocolSession);
+                    for (AMQDataBlock message : dataBlocks)
+                    {
 
-            _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
+                        try
+                        {
+                            if (PROTOCOL_DEBUG)
+                            {
+                                _protocolLogger.info(String.format("RECV: [%s] %s", this, message));
+                            }
+
+                            if(message instanceof AMQFrame)
+                            {
+                                final boolean debug = _logger.isDebugEnabled();
+                                final long msgNumber = ++_messageReceivedCount;
+
+                                if (debug && ((msgNumber % 1000) == 0))
+                                {
+                                    _logger.debug("Received " + _messageReceivedCount + " protocol messages");
+                                }
+
+                                AMQFrame frame = (AMQFrame) message;
+
+                                final AMQBody bodyFrame = frame.getBodyFrame();
+
+                                HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
+
+                                bodyFrame.handle(frame.getChannel(), _protocolSession);
+
+                                _connection.bytesReceived(_readBytes);
+                            }
+                            else if (message instanceof ProtocolInitiation)
+                            {
+                                // We get here if the server sends a response to our initial protocol header
+                                // suggesting an alternate ProtocolVersion; the server will then close the
+                                // connection.
+                                ProtocolInitiation protocolInit = (ProtocolInitiation) message;
+                                ProtocolVersion pv = protocolInit.checkVersion();
+                                getConnection().setProtocolVersion(pv);
+
+                                // get round a bug in old versions of qpid whereby the connection is not closed
+                                _stateManager.changeState(AMQState.CONNECTION_CLOSED);
+                            }
+                        }
+                        catch (Exception e)
+                        {
+                            e.printStackTrace();
+                            _logger.error("Exception processing frame", e);
+                            propagateExceptionToFrameListeners(e);
+                            exception(e);
+                        }
+                    }
+                }
+            }));
         }
-        else if (message instanceof ProtocolInitiation)
+        catch (Exception e)
         {
-            // We get here if the server sends a response to our initial protocol header
-            // suggesting an alternate ProtocolVersion; the server will then close the
-            // connection.
-            ProtocolInitiation protocolInit = (ProtocolInitiation) message;
-            ProtocolVersion pv = protocolInit.checkVersion();
-            getConnection().setProtocolVersion(pv);
-
-            // get round a bug in old versions of qpid whereby the connection is not closed
-            _stateManager.changeState(AMQState.CONNECTION_CLOSED);
+            propagateExceptionToFrameListeners(e);
+            exception(e);
         }
     }
 
-    public void methodBodyReceived(final int channelId, final AMQBody bodyFrame, IoSession session)//, final IoSession session)
+    public void methodBodyReceived(final int channelId, final AMQBody bodyFrame)
             throws AMQException
     {
 
@@ -571,32 +544,13 @@
         {
             propagateExceptionToFrameListeners(e);
 
-            exceptionCaught(session, e);
+            exception(e);
         }
 
     }
 
     private static int _messagesOut;
 
-    public void messageSent(IoSession session, Object message) throws Exception
-    {
-        if (PROTOCOL_DEBUG)
-        {
-            _protocolLogger.debug(String.format("SEND: [%s] %s", this, message));
-        }
-
-        final long sentMessages = _messagesOut++;
-
-        final boolean debug = _logger.isDebugEnabled();
-
-        if (debug && ((sentMessages % 1000) == 0))
-        {
-            _logger.debug("Sent " + _messagesOut + " protocol messages");
-        }
-
-        _connection.bytesSent(session.getWrittenBytes());
-    }
-
     public StateWaiter createWaiter(Set<AMQState> states) throws AMQException
     {
         return getStateManager().createWaiter(states);
@@ -610,12 +564,34 @@
      */
     public void writeFrame(AMQDataBlock frame)
     {
-        _protocolSession.writeFrame(frame);
+        writeFrame(frame, false);
     }
 
     public void writeFrame(AMQDataBlock frame, boolean wait)
     {
-        _protocolSession.writeFrame(frame, wait);
+        ByteBuffer buf = frame.toNioByteBuffer();
+        _writtenBytes += buf.remaining();
+        _networkDriver.send(buf);
+        if (PROTOCOL_DEBUG)
+        {
+            _protocolLogger.debug(String.format("SEND: [%s] %s", this, frame));
+        }
+
+        final long sentMessages = _messagesOut++;
+
+        final boolean debug = _logger.isDebugEnabled();
+
+        if (debug && ((sentMessages % 1000) == 0))
+        {
+            _logger.debug("Sent " + _messagesOut + " protocol messages");
+        }
+
+        _connection.bytesSent(_writtenBytes);
+        
+        if (wait)
+        {
+            _networkDriver.flush();
+        }
     }
 
     /**
@@ -673,7 +649,7 @@
                 //FIXME: At this point here we should check or before add we should check _stateManager is in an open
                 // state so as we don't check we are likely just to time out here as I believe is being seen in QPID-1255 
             }
-            _protocolSession.writeFrame(frame);
+            writeFrame(frame);
 
             return listener.blockForFrame(timeout);
             // When control resumes before this line, a reply will have been received
@@ -723,20 +699,17 @@
         final AMQFrame frame = body.generateFrame(0);
 
         //If the connection is already closed then don't do a syncWrite
-        if (getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED))
-        {
-            _protocolSession.closeProtocolSession(false);
-        }
-        else
+        if (!getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED))
         {
             try
             {
                 syncWrite(frame, ConnectionCloseOkBody.class, timeout);
-                _protocolSession.closeProtocolSession();
+                _networkDriver.close();
+                closed();
             }
             catch (AMQTimeoutException e)
             {
-                _protocolSession.closeProtocolSession(false);
+                closed();
             }
             catch (FailoverException e)
             {
@@ -748,13 +721,13 @@
     /** @return the number of bytes read from this protocol session */
     public long getReadBytes()
     {
-        return _protocolSession.getIoSession().getReadBytes();
+        return _readBytes;
     }
 
     /** @return the number of bytes written to this protocol session */
     public long getWrittenBytes()
     {
-        return _protocolSession.getIoSession().getWrittenBytes();
+        return _writtenBytes;
     }
 
     public void failover(String host, int port)
@@ -807,6 +780,7 @@
     public void setStateManager(AMQStateManager stateManager)
     {
         _stateManager = stateManager;
+        _stateManager.setProtocolSession(_protocolSession);
     }
 
     public AMQProtocolSession getProtocolSession()
@@ -843,4 +817,35 @@
     {
         return _protocolSession.getProtocolVersion();
     }
+
+    public SocketAddress getRemoteAddress()
+    {
+        return _networkDriver.getRemoteAddress();
+    }
+
+    public SocketAddress getLocalAddress()
+    {
+        return _networkDriver.getLocalAddress();
+    }
+
+    public void setNetworkDriver(NetworkDriver driver)
+    {
+        _networkDriver = driver;
+    }
+    
+    /** @param delay delay in seconds (not ms) */
+    void initHeartbeats(int delay)
+    {
+        if (delay > 0)
+        {
+            getNetworkDriver().setMaxWriteIdle(delay);
+            getNetworkDriver().setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay));
+            HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
+        }
+    }
+
+    public NetworkDriver getNetworkDriver()
+    {
+        return _networkDriver;
+    }
 }

Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=815704&r1=815703&r2=815704&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Wed Sep 16 10:06:55 2009
@@ -20,11 +20,6 @@
  */
 package org.apache.qpid.client.protocol;
 
-import org.apache.commons.lang.StringUtils;
-import org.apache.mina.common.CloseFuture;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.WriteFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,6 +28,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQSession;
@@ -65,10 +61,6 @@
 
     protected static final String SASL_CLIENT = "SASLClient";
 
-    protected final IoSession _minaProtocolSession;
-
-    protected WriteFuture _lastWriteFuture;
-
     /**
      * The handler from which this session was created and which is used to handle protocol events. We send failover
      * events to the handler.
@@ -102,28 +94,15 @@
 
     protected final AMQConnection _connection;
 
-    private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
+    private ConnectionTuneParameters _connectionTuneParameters;
 
-    public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
-    {
-        _protocolHandler = protocolHandler;
-        _minaProtocolSession = protocolSession;
-        _minaProtocolSession.setAttachment(this);
-        // properties of the connection are made available to the event handlers
-        _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
-        // fixme - real value needed
-        _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
-        _protocolVersion = connection.getProtocolVersion();
-        _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
-                                                                           this);
-        _connection = connection;
+    private SaslClient _saslClient;
 
-    }
+    private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
 
     public AMQProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
     {
-        _protocolHandler = protocolHandler;
-        _minaProtocolSession = null;
+        _protocolHandler = protocolHandler;        
         _protocolVersion = connection.getProtocolVersion();
         _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
                                                                            this);
@@ -134,7 +113,7 @@
     {
         // start the process of setting up the connection. This is the first place that
         // data is written to the server.
-        _minaProtocolSession.write(new ProtocolInitiation(_connection.getProtocolVersion()));
+        _protocolHandler.writeFrame(new ProtocolInitiation(_connection.getProtocolVersion()));
     }
 
     public String getClientID()
@@ -175,14 +154,9 @@
         return getAMQConnection().getPassword();
     }
 
-    public IoSession getIoSession()
-    {
-        return _minaProtocolSession;
-    }
-
     public SaslClient getSaslClient()
     {
-        return (SaslClient) _minaProtocolSession.getAttribute(SASL_CLIENT);    
+        return _saslClient;    
     }
 
     /**
@@ -192,28 +166,21 @@
      */
     public void setSaslClient(SaslClient client)
     {
-        if (client == null)
-        {
-            _minaProtocolSession.removeAttribute(SASL_CLIENT);
-        }
-        else
-        {
-            _minaProtocolSession.setAttribute(SASL_CLIENT, client);
-        }
+        _saslClient = client;
     }
 
     public ConnectionTuneParameters getConnectionTuneParameters()
     {
-        return (ConnectionTuneParameters) _minaProtocolSession.getAttribute(CONNECTION_TUNE_PARAMETERS);
+        return _connectionTuneParameters;
     }
 
     public void setConnectionTuneParameters(ConnectionTuneParameters params)
     {
-        _minaProtocolSession.setAttribute(CONNECTION_TUNE_PARAMETERS, params);
+        _connectionTuneParameters = params;
         AMQConnection con = getAMQConnection();
         con.setMaximumChannelCount(params.getChannelMax());
         con.setMaximumFrameSize(params.getFrameMax());
-        initHeartbeats((int) params.getHeartbeat());
+        _protocolHandler.initHeartbeats((int) params.getHeartbeat());
     }
 
     /**
@@ -335,21 +302,12 @@
      */
     public void writeFrame(AMQDataBlock frame)
     {
-        writeFrame(frame, false);
+        _protocolHandler.writeFrame(frame);
     }
 
     public void writeFrame(AMQDataBlock frame, boolean wait)
     {
-        WriteFuture f = _minaProtocolSession.write(frame);
-        if (wait)
-        {
-            // fixme -- time out?
-            f.join();
-        }
-        else
-        {
-            _lastWriteFuture = f;
-        }
+        _protocolHandler.writeFrame(frame, wait);
     }
 
     /**
@@ -407,33 +365,12 @@
 
     public AMQConnection getAMQConnection()
     {
-        return (AMQConnection) _minaProtocolSession.getAttribute(AMQ_CONNECTION);
+        return _connection;
     }
 
-    public void closeProtocolSession()
+    public void closeProtocolSession() throws AMQException
     {
-        closeProtocolSession(true);
-    }
-
-    public void closeProtocolSession(boolean waitLast)
-    {
-        _logger.debug("Waiting for last write to join.");
-        if (waitLast && (_lastWriteFuture != null))
-        {
-            _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
-        }
-
-        _logger.debug("Closing protocol session");
-        
-        final CloseFuture future = _minaProtocolSession.close();
-
-        // There is no recovery we can do if the join on the close failes so simply mark the connection CLOSED
-        // then wait for the connection to close.
-        // ritchiem: Could this release BlockingWaiters to early? The close has been done as much as possible so any
-        // error now shouldn't matter.
-
-        _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED);
-        future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
+        _protocolHandler.closeConnection(0);
     }
 
     public void failover(String host, int port)
@@ -449,22 +386,11 @@
             id = _queueId++;
         }
         // get rid of / and : and ; from address for spec conformance
-        String localAddress = StringUtils.replaceChars(_minaProtocolSession.getLocalAddress().toString(), "/;:", "");
+        String localAddress = StringUtils.replaceChars(_protocolHandler.getLocalAddress().toString(), "/;:", "");
 
         return new AMQShortString("tmp_" + localAddress + "_" + id);
     }
 
-    /** @param delay delay in seconds (not ms) */
-    void initHeartbeats(int delay)
-    {
-        if (delay > 0)
-        {
-            _minaProtocolSession.setIdleTime(IdleStatus.WRITER_IDLE, delay);
-            _minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, HeartbeatConfig.CONFIG.getTimeout(delay));
-            HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
-        }
-    }
-
     public void confirmConsumerCancelled(int channelId, AMQShortString consumerTag)
     {
         final AMQSession session = getSession(channelId);
@@ -530,7 +456,7 @@
 
     public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws AMQException
     {
-        _protocolHandler.methodBodyReceived(channel, amqMethodBody, _minaProtocolSession);
+        _protocolHandler.methodBodyReceived(channel, amqMethodBody);
     }
 
     public void notifyError(Exception error)

Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java?rev=815704&r1=815703&r2=815704&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java Wed Sep 16 10:06:55 2009
@@ -24,23 +24,33 @@
 import org.apache.mina.common.ConnectFuture;
 import org.apache.mina.common.IoConnector;
 import org.apache.mina.common.SimpleByteBufferAllocator;
+import org.apache.mina.filter.SSLFilter;
 import org.apache.mina.transport.socket.nio.ExistingSocketConnector;
 import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
 import org.apache.mina.transport.socket.nio.SocketSessionConfig;
 
+import org.apache.qpid.client.SSLConfiguration;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.jms.BrokerDetails;
 import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.network.mina.MINANetworkDriver;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import sun.net.InetAddressCachePolicy;
+
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.security.GeneralSecurityException;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import javax.net.ssl.SSLEngine;
+
 public class SocketTransportConnection implements ITransportConnection
 {
     private static final Logger _logger = LoggerFactory.getLogger(SocketTransportConnection.class);
@@ -71,61 +81,27 @@
         }
 
         final IoConnector ioConnector = _socketConnectorFactory.newSocketConnector();
-        SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig();
-
-        // if we do not use our own thread model we get the MINA default which is to use
-        // its own leader-follower model
-        boolean readWriteThreading = Boolean.getBoolean("amqj.shared_read_write_pool");
-        if (readWriteThreading)
-        {
-            cfg.setThreadModel(ReadWriteThreadModel.getInstance());
-        }
-
-        SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
-        scfg.setTcpNoDelay("true".equalsIgnoreCase(System.getProperty("amqj.tcpNoDelay", "true")));
-        scfg.setSendBufferSize(Integer.getInteger("amqj.sendBufferSize", DEFAULT_BUFFER_SIZE));
-        _logger.info("send-buffer-size = " + scfg.getSendBufferSize());
-        scfg.setReceiveBufferSize(Integer.getInteger("amqj.receiveBufferSize", DEFAULT_BUFFER_SIZE));
-        _logger.info("recv-buffer-size = " + scfg.getReceiveBufferSize());
-
         final InetSocketAddress address;
 
         if (brokerDetail.getTransport().equals(BrokerDetails.SOCKET))
         {
             address = null;
-
-            Socket socket = TransportConnection.removeOpenSocket(brokerDetail.getHost());
-
-            if (socket != null)
-            {
-                _logger.info("Using existing Socket:" + socket);
-
-                ((ExistingSocketConnector) ioConnector).setOpenSocket(socket);
-            }
-            else
-            {
-                throw new IllegalArgumentException("Active Socket must be provided for broker " +
-                                                   "with 'socket://<SocketID>' transport:" + brokerDetail);
-            }
         }
         else
         {
             address = new InetSocketAddress(brokerDetail.getHost(), brokerDetail.getPort());
             _logger.info("Attempting connection to " + address);
         }
-
-
-        ConnectFuture future = ioConnector.connect(address, protocolHandler);
-
-        // wait for connection to complete
-        if (future.join(brokerDetail.getTimeout()))
-        {
-            // we call getSession which throws an IOException if there has been an error connecting
-            future.getSession();
-        }
-        else
-        {
-            throw new IOException("Timeout waiting for connection.");
-        }
+        
+        SSLConfiguration sslConfig = protocolHandler.getConnection().getSSLConfiguration();
+        SSLContextFactory sslFactory = null;
+        if (sslConfig != null)
+        {
+            sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType());
+        }
+        
+        MINANetworkDriver driver = new MINANetworkDriver(ioConnector);
+        driver.open(brokerDetail.getPort(), address.getAddress(), protocolHandler, null, sslFactory);
+        protocolHandler.setNetworkDriver(driver);
     }
 }

Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java?rev=815704&r1=815703&r2=815704&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java Wed Sep 16 10:06:55 2009
@@ -79,7 +79,7 @@
         return _openSocketRegister.remove(socketID);
     }
 
-    public static synchronized ITransportConnection getInstance(BrokerDetails details) throws AMQTransportConnectionException
+    public static synchronized ITransportConnection getInstance(final BrokerDetails details) throws AMQTransportConnectionException
     {
         int transport = getTransport(details.getTransport());
 
@@ -95,7 +95,22 @@
                 {
                     public IoConnector newSocketConnector()
                     {
-                        return new ExistingSocketConnector(1,new QpidThreadExecutor());
+                        ExistingSocketConnector connector = new ExistingSocketConnector(1,new QpidThreadExecutor());
+
+                        Socket socket = TransportConnection.removeOpenSocket(details.getHost());
+
+                        if (socket != null)
+                        {
+                            _logger.info("Using existing Socket:" + socket);
+
+                            ((ExistingSocketConnector) connector).setOpenSocket(socket);
+                        }
+                        else
+                        {
+                            throw new IllegalArgumentException("Active Socket must be provided for broker " +
+                                                               "with 'socket://<SocketID>' transport:" + details);
+                        }
+                        return connector;
                     }
                 });
             case TCP:

Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java?rev=815704&r1=815703&r2=815704&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java Wed Sep 16 10:06:55 2009
@@ -28,6 +28,7 @@
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.jms.BrokerDetails;
 import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.transport.network.mina.MINANetworkDriver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,6 +40,8 @@
 
     private static int _port;
 
+    private MINANetworkDriver _networkDriver;
+
     public VmPipeTransportConnection(int port)
     {
         _port = port;
@@ -47,16 +50,16 @@
     public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException
     {
         final VmPipeConnector ioConnector = new QpidVmPipeConnector();
-        final IoServiceConfig cfg = ioConnector.getDefaultConfig();
-
-        cfg.setThreadModel(ReadWriteThreadModel.getInstance());
 
         final VmPipeAddress address = new VmPipeAddress(_port);
         _logger.info("Attempting connection to " + address);
-        ConnectFuture future = ioConnector.connect(address, protocolHandler);
+        _networkDriver = new MINANetworkDriver(ioConnector, protocolHandler);
+        protocolHandler.setNetworkDriver(_networkDriver);
+        ConnectFuture future = ioConnector.connect(address, _networkDriver);
         // wait for connection to complete
         future.join();
         // we call getSession which throws an IOException if there has been an error connecting
         future.getSession();
+        _networkDriver.setProtocolEngine(protocolHandler);
     }
 }

Modified: qpid/branches/java-network-refactor/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java?rev=815704&r1=815703&r2=815704&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java Wed Sep 16 10:06:55 2009
@@ -27,6 +27,7 @@
 import org.apache.qpid.framing.amqp_8_0.BasicRecoverOkBodyImpl;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.transport.TestNetworkDriver;
 import org.apache.qpid.client.MockAMQConnection;
 import org.apache.qpid.client.AMQAuthenticationException;
 import org.apache.qpid.client.state.AMQState;
@@ -72,9 +73,7 @@
     {
         //Create a new ProtocolHandler with a fake connection.
         _handler = new AMQProtocolHandler(new MockAMQConnection("amqp://guest:guest@client/test?brokerlist='vm://:1'"));
-
-        _handler.sessionCreated(new MockIoSession());
-
+        _handler.setNetworkDriver(new TestNetworkDriver());
          AMQBody body = BasicRecoverOkBodyImpl.getFactory().newInstance(null, 1);
         _blockFrame = new AMQFrame(0, body);
 

Modified: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java?rev=815704&r1=815703&r2=815704&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java Wed Sep 16 10:06:55 2009
@@ -21,9 +21,13 @@
 package org.apache.qpid.pool;
 
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.mina.common.IoSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A Job is a continuation that batches together other continuations, specifically {@link Event}s, into one continuation.
@@ -66,6 +70,8 @@
 
     private final boolean _readJob;
 
+    private final static Logger _logger = LoggerFactory.getLogger(Job.class);
+    
     /**
      * Creates a new job that aggregates many continuations together.
      *
@@ -181,4 +187,38 @@
 
         public void notCompleted(final Job job);
     }
+    
+    /**
+     * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running.
+     *
+     * @param job The job.
+     * @param event   The event to hand off asynchronously.
+     */
+    public static void fireAsynchEvent(ExecutorService pool, Job job, Event event)
+    {
+
+        job.add(event);
+
+
+        if(pool == null)
+        {
+            return;
+        }
+
+        // rather than perform additional checks on pool to check that it hasn't shutdown.
+        // catch the RejectedExecutionException that will result from executing on a shutdown pool
+        if (job.activate())
+        {
+            try
+            {
+                pool.execute(job);
+            }
+            catch(RejectedExecutionException e)
+            {
+                _logger.warn("Thread pool shutdown while tasks still outstanding");
+            }
+        }
+
+    }
+    
 }

Modified: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java?rev=815704&r1=815703&r2=815704&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java Wed Sep 16 10:06:55 2009
@@ -37,6 +37,9 @@
  
    // Returns the remote address of the NetworkDriver 
    SocketAddress getRemoteAddress();
+
+   // Returns the local address of the NetworkDriver 
+   SocketAddress getLocalAddress();
  
    // Returns number of bytes written 
    long getWrittenBytes();

Modified: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java?rev=815704&r1=815703&r2=815704&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java Wed Sep 16 10:06:55 2009
@@ -24,8 +24,6 @@
 import java.net.InetAddress;
 import java.net.SocketAddress;
 
-import javax.net.ssl.SSLEngine;
-
 import org.apache.qpid.protocol.ProtocolEngine;
 import org.apache.qpid.protocol.ProtocolEngineFactory;
 import org.apache.qpid.ssl.SSLContextFactory;
@@ -33,13 +31,14 @@
 public interface NetworkDriver extends Sender<java.nio.ByteBuffer> 
 { 
    // Creates a NetworkDriver which attempts to connect to destination on port and attaches the ProtocolEngine to  
-   // it using the SSLEngine if provided 
+   // it using the SSLContextFactory if provided 
    void open(int port, InetAddress destination, ProtocolEngine engine,
-           NetworkDriverConfiguration config, SSLEngine sslEngine)
+           NetworkDriverConfiguration config, SSLContextFactory sslFactory)
    throws OpenException; 
    
    // listens for incoming connections on the specified ports and address and creates a new NetworkDriver which 
-   // processes incoming connections with ProtocolEngines created from factory using the SSLEngine if provided 
+   // processes incoming connections with ProtocolEngines and SSLEngines created from the factories 
+   // (in the case of an SSLContextFactory, if provided) 
    void bind (int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory,  
               NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException; 
  

Modified: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java?rev=815704&r1=815703&r2=815704&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java Wed Sep 16 10:06:55 2009
@@ -21,7 +21,9 @@
 
 package org.apache.qpid.transport;
 
-public class OpenException extends Exception
+import java.io.IOException;
+
+public class OpenException extends IOException
 {
 
     public OpenException(String string, Throwable lastException)

Modified: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java?rev=815704&r1=815703&r2=815704&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java Wed Sep 16 10:06:55 2009
@@ -33,6 +33,7 @@
 import org.apache.mina.common.ConnectFuture;
 import org.apache.mina.common.IdleStatus;
 import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoConnector;
 import org.apache.mina.common.IoFilterChain;
 import org.apache.mina.common.IoHandlerAdapter;
 import org.apache.mina.common.IoSession;
@@ -71,7 +72,7 @@
     private int _processors = 4;
     private boolean _executorPool = false;
     private SSLContextFactory _sslFactory = null;
-    private SocketConnector _socketConnector;
+    private IoConnector _socketConnector;
     private IoAcceptor _acceptor;
     private IoSession _ioSession;
     private ProtocolEngineFactory _factory;
@@ -101,6 +102,7 @@
         _protectIO = protectIO;
         _protocolEngine = protocolEngine;
         _ioSession = session;
+        _ioSession.setAttachment(_protocolEngine);
     }
     
     public MINANetworkDriver()
@@ -108,6 +110,17 @@
 
     }
 
+    public MINANetworkDriver(IoConnector ioConnector)
+    {
+        _socketConnector = ioConnector;
+    }
+    
+    public MINANetworkDriver(IoConnector ioConnector, ProtocolEngine engine)
+    {
+        _socketConnector = ioConnector;
+        _protocolEngine = engine;
+    }
+
     public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory factory,
             NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException
     {
@@ -188,8 +201,13 @@
     
 
     public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config,
-            SSLEngine sslEngine) throws OpenException
+            SSLContextFactory sslFactory) throws OpenException
     {
+        if (sslFactory != null)
+        {
+            _sslFactory = sslFactory;
+        }
+        
         if (_useNIO)
         {
             _socketConnector = new MultiThreadSocketConnector(1, new QpidThreadExecutor());
@@ -207,7 +225,6 @@
         {
             org.apache.mina.common.ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
         }
-        
 
         SocketConnectorConfig cfg = (SocketConnectorConfig) _socketConnector.getDefaultConfig();
 
@@ -229,7 +246,11 @@
         // one SocketConnector per connection at the moment anyway). This allows
         // short-running
         // clients (like unit tests) to complete quickly.
-        _socketConnector.setWorkerTimeout(0);
+        if (_socketConnector instanceof SocketConnector)
+        {
+            ((SocketConnector) _socketConnector).setWorkerTimeout(0);
+        }   
+        
         ConnectFuture future = _socketConnector.connect(new InetSocketAddress(destination, port), this, cfg);
         future.join();
         if (!future.isConnected())
@@ -333,56 +354,54 @@
 
     public void sessionCreated(IoSession protocolSession) throws Exception
     {
-        if (_acceptingConnections)
+        // Configure the session with SSL if necessary
+        SessionUtil.initialize(protocolSession);
+        if (_executorPool)
         {
-            // Configure the session with SSL if necessary
-            SessionUtil.initialize(protocolSession);
-            if (_executorPool)
+            if (_sslFactory != null)
             {
-                if (_sslFactory != null)
-                {
-                    protocolSession.getFilterChain().addAfter("AsynchronousReadFilter", "sslFilter",
-                            new SSLFilter(_sslFactory.buildServerContext()));
-                }
+                protocolSession.getFilterChain().addAfter("AsynchronousReadFilter", "sslFilter",
+                        new SSLFilter(_sslFactory.buildServerContext()));
             }
-            else
+        }
+        else
+        {
+            if (_sslFactory != null)
             {
-                if (_sslFactory != null)
-                {
-                    protocolSession.getFilterChain().addBefore("protocolFilter", "sslFilter",
-                            new SSLFilter(_sslFactory.buildServerContext()));
-                }
+                protocolSession.getFilterChain().addBefore("protocolFilter", "sslFilter",
+                        new SSLFilter(_sslFactory.buildServerContext()));
             }
+        }
+        // Do we want to have read/write buffer limits?
+        if (_protectIO)
+        {
+            //Add IO Protection Filters
+            IoFilterChain chain = protocolSession.getFilterChain();
 
-            // Do we want to have read/write buffer limits?
-            if (_protectIO)
-            {
-                //Add IO Protection Filters
-                IoFilterChain chain = protocolSession.getFilterChain();
+            protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
 
-                protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
+            ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
+            readfilter.setMaximumConnectionBufferSize(_config.getReceiveBufferSize());
+            readfilter.attach(chain);
 
-                ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
-                readfilter.setMaximumConnectionBufferSize(_config.getReceiveBufferSize());
-                readfilter.attach(chain);
+            WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
+            writefilter.setMaximumConnectionBufferSize(_config.getSendBufferSize());
+            writefilter.attach(chain);
 
-                WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
-                writefilter.setMaximumConnectionBufferSize(_config.getSendBufferSize());
-                writefilter.attach(chain);
+            protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
+        }
 
-                protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
-            }
-            
-            if (_ioSession == null)
-            {
-                _ioSession = protocolSession;
-            }
-            
+        if (_ioSession == null)
+        {
+            _ioSession = protocolSession;
+        }
+        
+        if (_acceptingConnections)
+        {
             // Set up the protocol engine
             ProtocolEngine protocolEngine = _factory.newProtocolEngine(this);
             MINANetworkDriver newDriver = new MINANetworkDriver(_useNIO, _processors, _executorPool, _protectIO, protocolEngine, protocolSession);
             protocolEngine.setNetworkDriver(newDriver);
-            protocolSession.setAttachment(protocolEngine);
         }
     }
 
@@ -409,4 +428,13 @@
         _acceptingConnections = acceptingConnections;
     }
 
+    public void setProtocolEngine(ProtocolEngine protocolEngine)
+    {
+        _protocolEngine = protocolEngine;
+        if (_ioSession != null)
+        {
+            _ioSession.setAttachment(protocolEngine);
+        }
+    }
+
 }

Copied: qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java (from r815682, qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestNetworkDriver.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java?p2=qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java&p1=qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestNetworkDriver.java&r1=815682&r2=815704&rev=815704&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestNetworkDriver.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java Wed Sep 16 10:06:55 2009
@@ -18,7 +18,7 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.protocol;
+package org.apache.qpid.transport;
 
 import java.net.BindException;
 import java.net.InetAddress;
@@ -28,14 +28,9 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
-import javax.net.ssl.SSLEngine;
-
 import org.apache.qpid.protocol.ProtocolEngine;
 import org.apache.qpid.protocol.ProtocolEngineFactory;
 import org.apache.qpid.ssl.SSLContextFactory;
-import org.apache.qpid.transport.NetworkDriver;
-import org.apache.qpid.transport.NetworkDriverConfiguration;
-import org.apache.qpid.transport.OpenException;
 
 /**
  * Test implementation of IoSession, which is required for some tests. Methods not being used are not implemented,
@@ -44,21 +39,17 @@
 public class TestNetworkDriver implements NetworkDriver
 {
     private final ConcurrentMap attributes = new ConcurrentHashMap();
-    private String _address = "127.0.0.1";
+    private String _remoteAddress = "127.0.0.1";
+    private String _localAddress = "127.0.0.1";
     private int _port = 1;
 
     public TestNetworkDriver()
     {
     }
 
-    public void setAddress(String string)
-    {
-        this._address = string;
-    }
-
-    public String getAddress()
+    public void setRemoteAddress(String string)
     {
-        return _address;
+        this._remoteAddress = string;
     }
 
     public void setPort(int _port)
@@ -79,16 +70,16 @@
 
     public SocketAddress getLocalAddress()
     {
-        return new InetSocketAddress(_address, _port);
+        return new InetSocketAddress(_localAddress, _port);
     }
 
     public SocketAddress getRemoteAddress()
     {
-        return new InetSocketAddress(_address, _port);
+        return new InetSocketAddress(_remoteAddress, _port);
     }
 
     public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config,
-            SSLEngine sslEngine) throws OpenException
+            SSLContextFactory sslFactory) throws OpenException
     {
         
     }
@@ -123,4 +114,9 @@
         
     }
 
+    public void setLocalAddress(String localAddress)
+    {
+        _localAddress = localAddress;
+    }
+
 }

Modified: qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java?rev=815704&r1=815703&r2=815704&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java Wed Sep 16 10:06:55 2009
@@ -382,6 +382,18 @@
                 return null;
             }
         }
+        
+        public SocketAddress getLocalAddress()
+        {            
+            if (_driver != null)
+            {
+                return _driver.getLocalAddress();
+            } 
+            else
+            {
+                return null;
+            }
+        }
 
         public long getWrittenBytes()
         {
@@ -459,6 +471,7 @@
         {
             return _closed;
         }
+
     }
 
     private class EchoProtocolEngine extends CountingProtocolEngine

Modified: qpid/branches/java-network-refactor/qpid/java/systests/build.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/systests/build.xml?rev=815704&r1=815703&r2=815704&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/systests/build.xml (original)
+++ qpid/branches/java-network-refactor/qpid/java/systests/build.xml Wed Sep 16 10:06:55 2009
@@ -20,7 +20,7 @@
  -->
 <project name="System Tests" default="build">
 
-    <property name="module.depends" value="client management/tools/qpid-cli management/eclipse-plugin management/common broker broker/test common junit-toolkit"/>
+    <property name="module.depends" value="client management/tools/qpid-cli management/eclipse-plugin management/common broker broker/test common common/test nt junit-toolkit"/>
     <property name="module.test.src" location="src/main/java"/>
     <property name="module.test.excludes"
               value="**/TTLTest.java,**/DropInTest.java,**/TestClientControlledTest.java"/>

Modified: qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java?rev=815704&r1=815703&r2=815704&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java Wed Sep 16 10:06:55 2009
@@ -22,6 +22,9 @@
 package org.apache.qpid.server.security.acl;
 
 import junit.framework.TestCase;
+
+import org.apache.log4j.BasicConfigurator;
+import org.apache.log4j.Logger;
 import org.apache.qpid.client.transport.TransportConnection;
 import org.apache.qpid.client.*;
 import org.apache.qpid.framing.AMQShortString;
@@ -34,11 +37,17 @@
 
 import javax.jms.*;
 import javax.jms.IllegalStateException;
+
 import java.io.File;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
-public class SimpleACLTest extends QpidTestCase implements ConnectionListener
+public class SimpleACLTest extends QpidTestCase implements ConnectionListener, ExceptionListener
 {
     private String BROKER = "vm://:"+ApplicationRegistry.DEFAULT_INSTANCE;//"tcp://localhost:5672";
+    private ArrayList<Exception> _thrownExceptions = new ArrayList<Exception>();
+    private CountDownLatch _awaitError = new CountDownLatch(51);
 
     public void setUp() throws Exception
     {
@@ -268,7 +277,7 @@
         }
     }
 
-    public void testClientPublishInvalidQueueSuccess() throws AMQException, URLSyntaxException, JMSException
+    public void testClientPublishInvalidQueueSuccess() throws AMQException, URLSyntaxException, JMSException, InterruptedException
     {
         try
         {
@@ -276,41 +285,38 @@
 
             ((AMQConnection) conn).setConnectionListener(this);
 
-            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
 
             conn.start();
-
+            conn.setExceptionListener(this);
             MessageProducer sender = ((AMQSession) session).createProducer(null);
 
-            Queue queue = session.createQueue("Invalid");
-
+            Queue queue = session.createQueue("NewQueueThatIDoNotHaveRightsToPublishToo");
+            
             // Send a message that we will wait to be sent, this should give the broker time to close the connection
             // before we finish this test. Message is set !immed !mand as the queue is invalid so want to test ACLs not
             // queue existence.
             ((org.apache.qpid.jms.MessageProducer) sender).send(queue, session.createTextMessage("test"),
                                                                 DeliveryMode.NON_PERSISTENT, 0, 0L, false, false, true);
 
-            // Test the connection with a valid consumer
-            // This may fail as the session may be closed before the queue or the consumer created.
-            Queue temp = session.createTemporaryQueue();
-
-            session.createConsumer(temp).close();
-
-            //Connection should now be closed and will throw the exception caused by the above send
-            conn.close();
-
-            fail("Close is not expected to succeed.");
+            _awaitError.await(1, TimeUnit.SECONDS);
         }
         catch (JMSException e)
         {
-            Throwable cause = e.getLinkedException();
-            if (!(cause instanceof AMQAuthenticationException))
+            fail("Unknown exception thrown:" + e.getMessage());
+        }
+        boolean foundCorrectException = false;
+        for (Exception cause : _thrownExceptions)
+        {
+            if (cause instanceof JMSException)
             {
-                e.printStackTrace();
+                if (((JMSException) cause).getLinkedException() instanceof AMQAuthenticationException)
+                {
+                    foundCorrectException = true;
+                }
             }
-            assertEquals("Incorrect exception", AMQAuthenticationException.class, cause.getClass());
-            assertEquals("Incorrect error code thrown", 403, ((AMQAuthenticationException) cause).getErrorCode().getCode());
         }
+        assertTrue("Did not get AMQAuthenticationException thrown", foundCorrectException);
     }
 
     public void testServerConsumeFromNamedQueueValid() throws AMQException, URLSyntaxException
@@ -657,4 +663,10 @@
     public void failoverComplete()
     {
     }
+
+    public void onException(JMSException arg0)
+    {
+        _thrownExceptions.add(arg0);
+        _awaitError.countDown();
+    }
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org