You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2008/07/15 19:06:18 UTC

svn commit: r676978 [1/3] - in /incubator/qpid/trunk/qpid/java: ./ client/src/main/java/org/apache/qpid/client/ client/src/main/java/org/apache/qpid/client/failover/ client/src/main/java/org/apache/qpid/client/handler/ client/src/main/java/org/apache/q...

Author: ritchiem
Date: Tue Jul 15 10:06:16 2008
New Revision: 676978

URL: http://svn.apache.org/viewvc?rev=676978&view=rev
Log:
QPID-940,QPID-594,QPID-805,QPID-826 : Updated the client exception handling so that exceptions are not lost. In performing the changes I noted that the AMQStateManager is only used for connection creation in the 08/09 codepath. Now this may be due to the fact that we don't currently need to wait on any other states. We need to improve our testing of error conditions for all protcol versions.

Changes Summary:

The MethodHandlers had their AMQStateManager parameters swapped for AMQSession as that is what they all cared about.
The BlockingMethodFrameListener was used as a basis to create a generic BlockingWaiter which is now used by StateWaiter,
There is probably scope to refactor the AMQStateManager and the parts of the AMQProtocolHandler that deal with the _frameListeners into a generic class but that can be looked at as part of a wider client refactor.

Additionally tests were updated such as SimpleACLTest and ConnectionTest as they were expecting JMSExceptions from the constructor but the JMS API does not demand that and AMQExceptions are now correctly being thrown.

The SimpleACLTest also required a change to AMQSession.
The test calls send which will cause the connection to be closed asynchrously due to a permission violation. As this exception is not expected and is asynchorous to the client code it cannot be directly thrown. The solution is to record this exception in the AMQStateManager, it can tell that there are no waiters for the exception so it can record the value.(Potential exists to alert if the exception is overwritten but I don't think this is required right now)

When the AMQSession checks that the connection is closed it is then possible to check if the current State is CLOSED and if we have an exception sitting in the AMQStateManager. If all these are true we can attach the AMQStateManager exception to the IllegalState Exception that is normally thrown when the Session is closed.

This maintains JMS Compliance and allows us to discover the cause of the failure, SimpleACLTest was updated by removing the IllegalState catch section that was causing the test to silently fail.

Added:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
Removed:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateListener.java
Modified:
    incubator/qpid/trunk/qpid/java/010ExcludeList
    incubator/qpid/trunk/qpid/java/010ExcludeList-store
    incubator/qpid/trunk/qpid/java/08ExcludeList
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java
    incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java
    incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
    incubator/qpid/trunk/qpid/java/common.xml
    incubator/qpid/trunk/qpid/java/systests/build.xml
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java

Modified: incubator/qpid/trunk/qpid/java/010ExcludeList
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/010ExcludeList?rev=676978&r1=676977&r2=676978&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/010ExcludeList (original)
+++ incubator/qpid/trunk/qpid/java/010ExcludeList Tue Jul 15 10:06:16 2008
@@ -52,9 +52,9 @@
 org.apache.qpid.server.queue.PriorityTest
 //this test checks explicitly for 0-8 flow control semantics
 org.apache.qpid.test.client.FlowControlTest
-// Client Race Condition
-org.apache.qpid.test.unit.client.connection.ConnectionTest#testPasswordFailureConnection
 // 0-10 c++ broker doesn't implement virtual hosts, or those wackhy exchanges
 org.apache.qpid.test.unit.client.connection.ConnectionTest#testUnresolvedVirtualHostFailure
 org.apache.qpid.test.unit.client.connection.ConnectionTest#testDefaultExchanges
+// 0-10 c++ broker in cpp.testprofile is started with no auth so won't pass this test
+org.apache.qpid.test.unit.client.connection.ConnectionTest#testPasswordFailureConnection
 

Modified: incubator/qpid/trunk/qpid/java/010ExcludeList-store
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/010ExcludeList-store?rev=676978&r1=676977&r2=676978&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/010ExcludeList-store (original)
+++ incubator/qpid/trunk/qpid/java/010ExcludeList-store Tue Jul 15 10:06:16 2008
@@ -47,7 +47,6 @@
 org.apache.qpid.server.queue.PriorityTest
 //this test checks explicitly for 0-8 flow control semantics
 org.apache.qpid.test.client.FlowControlTest
-
-// Client Race Condition
+// The default cpp.testprofile does not start the cpp broker with authentication so this test will fail.
 org.apache.qpid.test.unit.client.connection.ConnectionTest#testPasswordFailureConnection
 

Modified: incubator/qpid/trunk/qpid/java/08ExcludeList
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/08ExcludeList?rev=676978&r1=676977&r2=676978&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/08ExcludeList (original)
+++ incubator/qpid/trunk/qpid/java/08ExcludeList Tue Jul 15 10:06:16 2008
@@ -5,5 +5,3 @@
 // Those tests are not finished
 org.apache.qpid.test.testcases.TTLTest#*
 org.apache.qpid.test.testcases.FailoverTest#*
-// Client Race Condition 
-org.apache.qpid.test.unit.client.connection.ConnectionTest#testPasswordFailureConnection

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=676978&r1=676977&r2=676978&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Tue Jul 15 10:06:16 2008
@@ -26,7 +26,6 @@
 import org.apache.qpid.AMQUnresolvedAddressException;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.client.state.AMQState;
 import org.apache.qpid.client.configuration.ClientProperties;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.*;
@@ -76,11 +75,10 @@
         private final LinkedHashMap<Integer, AMQSession> _slowAccessSessions = new LinkedHashMap<Integer, AMQSession>();
         private int _size = 0;
         private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
-        
 
         public AMQSession get(int channelId)
         {
-            if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+            if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
             {
                 return _fastAccessSessions[channelId];
             }
@@ -93,7 +91,7 @@
         public AMQSession put(int channelId, AMQSession session)
         {
             AMQSession oldVal;
-            if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+            if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
             {
                 oldVal = _fastAccessSessions[channelId];
                 _fastAccessSessions[channelId] = session;
@@ -102,11 +100,11 @@
             {
                 oldVal = _slowAccessSessions.put(channelId, session);
             }
-            if((oldVal != null) && (session == null))
+            if ((oldVal != null) && (session == null))
             {
                 _size--;
             }
-            else if((oldVal == null) && (session != null))
+            else if ((oldVal == null) && (session != null))
             {
                 _size++;
             }
@@ -115,13 +113,12 @@
 
         }
 
-
         public AMQSession remove(int channelId)
         {
             AMQSession session;
-            if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+            if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
             {
-                 session = _fastAccessSessions[channelId];
+                session = _fastAccessSessions[channelId];
                 _fastAccessSessions[channelId] = null;
             }
             else
@@ -129,7 +126,7 @@
                 session = _slowAccessSessions.remove(channelId);
             }
 
-            if(session != null)
+            if (session != null)
             {
                 _size--;
             }
@@ -141,9 +138,9 @@
         {
             ArrayList<AMQSession> values = new ArrayList<AMQSession>(size());
 
-            for(int i = 0; i < 16; i++)
+            for (int i = 0; i < 16; i++)
             {
-                if(_fastAccessSessions[i] != null)
+                if (_fastAccessSessions[i] != null)
                 {
                     values.add(_fastAccessSessions[i]);
                 }
@@ -162,14 +159,13 @@
         {
             _size = 0;
             _slowAccessSessions.clear();
-            for(int i = 0; i<16; i++)
+            for (int i = 0; i < 16; i++)
             {
                 _fastAccessSessions[i] = null;
             }
         }
     }
 
-
     private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class);
 
     protected AtomicInteger _idFactory = new AtomicInteger(0);
@@ -211,7 +207,6 @@
 
     /** The virtual path to connect to on the AMQ server */
     private String _virtualHost;
-   
 
     protected ExceptionListener _exceptionListener;
 
@@ -252,15 +247,12 @@
     private ProtocolVersion _protocolVersion = ProtocolVersion.v0_9; // FIXME TGM, shouldn't need this
 
     protected AMQConnectionDelegate _delegate;
-    
+
     // this connection maximum number of prefetched messages
     private long _maxPrefetch;
 
     //Indicates whether persistent messages are synchronized
     private boolean _syncPersistence;
-    
-    /** used to hold a list of all exceptions that have been thrown during connection construction. gross */
-    final ArrayList<Exception> _exceptions = new ArrayList<Exception>();
 
     /**
      * @param broker      brokerdetails
@@ -337,20 +329,20 @@
 
     /**
      * @todo Some horrible stuff going on here with setting exceptions to be non-null to detect if an exception
-     *       was thrown during the connection! Intention not clear. Use a flag anyway, not exceptions... Will fix soon.
+     * was thrown during the connection! Intention not clear. Use a flag anyway, not exceptions... Will fix soon.
      */
     public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException
     {
         // set this connection maxPrefetch
         if (connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH) != null)
         {
-            _maxPrefetch = Long.parseLong( connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH));
+            _maxPrefetch = Long.parseLong(connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH));
         }
         else
         {
             // use the defaul value set for all connections
             _maxPrefetch = Long.valueOf(System.getProperties().getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME,
-                    ClientProperties.MAX_PREFETCH_DEFAULT));
+                                                                           ClientProperties.MAX_PREFETCH_DEFAULT));
         }
 
         if (connectionURL.getOption(ConnectionURL.AMQ_SYNC_PERSISTENCE) != null)
@@ -378,25 +370,6 @@
             _delegate = new AMQConnectionDelegate_0_10(this);
         }
 
-       
-        class Listener implements ExceptionListener
-        {
-            public void onException(JMSException e)
-            {
-                _exceptions.add(e);
-            }
-        }
-
-        try
-        {
-            setExceptionListener(new Listener());
-        }
-        catch (JMSException e)
-        {
-            // Shouldn't happen
-            throw new AMQException(null, null, e);
-        }
-
         if (_logger.isInfoEnabled())
         {
             _logger.info("Connection:" + connectionURL);
@@ -436,15 +409,15 @@
             _temporaryTopicExchangeName = connectionURL.getTemporaryTopicExchangeName();
         }
 
-
-         _protocolHandler = new AMQProtocolHandler(this);
+        _protocolHandler = new AMQProtocolHandler(this);
 
         // We are not currently connected
         _connected = false;
 
         // TMG FIXME this seems... wrong...
         boolean retryAllowed = true;
-        while (!_connected && retryAllowed )
+        Exception connectionException = null;
+        while (!_connected && retryAllowed)
         {
             try
             {
@@ -456,37 +429,29 @@
                 {
                     _logger.info(pe.getMessage());
                     _logger.info("Trying broker supported protocol version: " +
-                            TransportConstants.getVersionMajor() + "." +
-                            TransportConstants.getVersionMinor());
+                                 TransportConstants.getVersionMajor() + "." +
+                                 TransportConstants.getVersionMinor());
                 }
                 // we need to check whether we have a delegate for the supported protocol
                 getDelegate();
             }
             catch (Exception e)
             {
-                _exceptions.add(e);
                 if (_logger.isInfoEnabled())
                 {
-                    _logger.info("Unable to connect to broker at " + 
-                                _failoverPolicy.getCurrentBrokerDetails(),
-                                e);
+                    _logger.info("Unable to connect to broker at " +
+                                 _failoverPolicy.getCurrentBrokerDetails(),
+                                 e);
                 }
+                connectionException = e;
             }
-            
+
             if (!_connected)
             {
                 retryAllowed = _failoverPolicy.failoverAllowed();
                 brokerDetails = _failoverPolicy.getNextBrokerDetails();
             }
         }
-        try
-        {
-            setExceptionListener(null);
-        }
-        catch (JMSException e1)
-        {
-            // Can't happen
-        }
 
         if (_logger.isDebugEnabled())
         {
@@ -496,26 +461,16 @@
         if (!_connected)
         {
             String message = null;
-            try
-            {
-                Thread.sleep(150);
-            }
-            catch (InterruptedException e)
-            {
-                // Eat it, we've hopefully got all the exceptions if this happened
-            }
-            
-            Exception lastException = null;
-            if (_exceptions.size() > 0)
+
+            if (connectionException != null)
             {
-                lastException = _exceptions.get(_exceptions.size() - 1);
-                if (lastException.getCause() != null)
+                if (connectionException.getCause() != null)
                 {
-                    message = lastException.getCause().getMessage();
+                    message = connectionException.getCause().getMessage();
                 }
                 else
                 {
-                    message = lastException.getMessage();
+                    message = connectionException.getMessage();
                 }
             }
 
@@ -527,20 +482,20 @@
                 }
                 else // can only be "" if getMessage() returned it therfore lastException != null
                 {
-                    message = "Unable to Connect:" + lastException.getClass();
+                    message = "Unable to Connect:" + connectionException.getClass();
                 }
             }
 
-            AMQException e = new AMQConnectionFailureException(message, _exceptions);
-                        
-            if (lastException != null)
+            AMQException e = new AMQConnectionFailureException(message, connectionException);
+
+            if (connectionException != null)
             {
-                if (lastException instanceof UnresolvedAddressException)
+                if (connectionException instanceof UnresolvedAddressException)
                 {
                     e = new AMQUnresolvedAddressException(message, _failoverPolicy.getCurrentBrokerDetails().toString(),
                                                           null);
                 }
-                
+
             }
             throw e;
         }
@@ -565,18 +520,18 @@
         try
         {
             Class c = Class.forName("org.apache.qpid.client.AMQConnectionDelegate_" +
-                    TransportConstants.getVersionMajor() + "_" +
-                    TransportConstants.getVersionMinor());
-             Class partypes[] = new Class[1];
+                                    TransportConstants.getVersionMajor() + "_" +
+                                    TransportConstants.getVersionMinor());
+            Class partypes[] = new Class[1];
             partypes[0] = AMQConnection.class;
             _delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this);
         }
         catch (Exception e)
         {
             throw new AMQProtocolException(AMQConstant.UNSUPPORTED_CLIENT_PROTOCOL_ERROR,
-                    "Protocol: " + TransportConstants.getVersionMajor() + "."
-            + TransportConstants.getVersionMinor() + " is rquired by the broker but is not " +
-                            "currently supported by this client library implementation", e);
+                                           "Protocol: " + TransportConstants.getVersionMajor() + "."
+                                           + TransportConstants.getVersionMinor() + " is rquired by the broker but is not " +
+                                           "currently supported by this client library implementation", e);
         }
     }
 
@@ -867,7 +822,6 @@
                 }
             }
 
-
         }
     }
 
@@ -892,14 +846,14 @@
         }
     }
 
-    public void close() throws JMSException
+    public void     close() throws JMSException
     {
         close(DEFAULT_TIMEOUT);
     }
 
     public void close(long timeout) throws JMSException
     {
-        close(new ArrayList<AMQSession>(_sessions.values()),timeout);
+        close(new ArrayList<AMQSession>(_sessions.values()), timeout);
     }
 
     public void close(List<AMQSession> sessions, long timeout) throws JMSException
@@ -912,12 +866,12 @@
 
     private void doClose(List<AMQSession> sessions, long timeout) throws JMSException
     {
-        synchronized(_sessionCreationLock)
+        synchronized (_sessionCreationLock)
         {
-            if(!sessions.isEmpty())
+            if (!sessions.isEmpty())
             {
                 AMQSession session = sessions.remove(0);
-                synchronized(session.getMessageDeliveryLock())
+                synchronized (session.getMessageDeliveryLock())
                 {
                     doClose(sessions, timeout);
                 }
@@ -1120,7 +1074,7 @@
     {
         return _sessions;
     }
-    
+
     public String getUsername()
     {
         return _username;
@@ -1297,6 +1251,8 @@
         if (cause instanceof IOException)
         {
             closer = !_closed.getAndSet(true);
+
+            _protocolHandler.getProtocolSession().notifyError(je);
         }
 
         if (_exceptionListener != null)
@@ -1339,7 +1295,7 @@
     {
         if (cause instanceof AMQException)
         {
-            return ((AMQException)cause).isHardError();
+            return ((AMQException) cause).isHardError();
         }
 
         return true;
@@ -1483,7 +1439,7 @@
      */
     public long getMaxPrefetch()
     {
-       return _maxPrefetch;
+        return _maxPrefetch;
     }
 
     /**
@@ -1495,14 +1451,4 @@
     {
         return _syncPersistence;
     }
-
-    public Exception getLastException()
-    {
-        if (_exceptions.size() > 0)
-        {
-            return _exceptions.get(_exceptions.size() - 1);
-        }
-        return null;
-    }
-
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=676978&r1=676977&r2=676978&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Tue Jul 15 10:06:16 2008
@@ -32,12 +32,12 @@
 import javax.jms.JMSException;
 import javax.jms.XASession;
 
-import org.apache.qpid.AMQConnectionFailureException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.failover.FailoverProtectedOperation;
 import org.apache.qpid.client.failover.FailoverRetrySupport;
 import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.client.state.StateWaiter;
 import org.apache.qpid.client.transport.TransportConnection;
 import org.apache.qpid.framing.BasicQosBody;
 import org.apache.qpid.framing.BasicQosOkBody;
@@ -84,11 +84,15 @@
         final Set<AMQState> openOrClosedStates =
                 EnumSet.of(AMQState.CONNECTION_OPEN, AMQState.CONNECTION_CLOSED);
 
+
+        StateWaiter waiter = _conn._protocolHandler.createWaiter(openOrClosedStates);
+
         TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail);
         // this blocks until the connection has been set up or when an error
         // has prevented the connection being set up
 
-        AMQState state = _conn._protocolHandler.attainState(openOrClosedStates);
+        AMQState state = waiter.await();
+
         if(state == AMQState.CONNECTION_OPEN)
         {
             _conn._failoverPolicy.attainedConnection();

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=676978&r1=676977&r2=676978&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Jul 15 10:06:16 2008
@@ -79,6 +79,8 @@
 import org.apache.qpid.client.message.UnprocessedMessage;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.util.FlowControllingBlockingQueue;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.AMQState;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.FieldTableFactory;
@@ -90,22 +92,20 @@
 import org.slf4j.LoggerFactory;
 
 /**
- *
  * <p/><table id="crc"><caption>CRC Card</caption>
  * <tr><th> Responsibilities <th> Collaborations
  * <tr><td>
  * </table>
  *
  * @todo Different FailoverSupport implementation are needed on the same method call, in different situations. For
- *       example, when failing-over and reestablishing the bindings, the bind cannot be interrupted by a second
- *       fail-over, if it fails with an exception, the fail-over process should also fail. When binding outside of
- *       the fail-over process, the retry handler could be used to automatically retry the operation once the connection
- *       has been reestablished. All fail-over protected operations should be placed in private methods, with
- *       FailoverSupport passed in by the caller to provide the correct support for the calling context. Sometimes the
- *       fail-over process sets a nowait flag and uses an async method call instead.
- *
+ * example, when failing-over and reestablishing the bindings, the bind cannot be interrupted by a second
+ * fail-over, if it fails with an exception, the fail-over process should also fail. When binding outside of
+ * the fail-over process, the retry handler could be used to automatically retry the operation once the connection
+ * has been reestablished. All fail-over protected operations should be placed in private methods, with
+ * FailoverSupport passed in by the caller to provide the correct support for the calling context. Sometimes the
+ * fail-over process sets a nowait flag and uses an async method call instead.
  * @todo Two new objects created on every failover supported method call. Consider more efficient ways of doing this,
- *       after looking at worse bottlenecks first.
+ * after looking at worse bottlenecks first.
  */
 public abstract class AMQSession extends Closeable implements Session, QueueSession, TopicSession
 {
@@ -114,10 +114,9 @@
         private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
         private final ConcurrentHashMap<Integer, BasicMessageConsumer> _slowAccessConsumers = new ConcurrentHashMap<Integer, BasicMessageConsumer>();
 
-
         public BasicMessageConsumer get(int id)
         {
-            if((id & 0xFFFFFFF0) == 0)
+            if ((id & 0xFFFFFFF0) == 0)
             {
                 return _fastAccessConsumers[id];
             }
@@ -130,7 +129,7 @@
         public BasicMessageConsumer put(int id, BasicMessageConsumer consumer)
         {
             BasicMessageConsumer oldVal;
-            if((id & 0xFFFFFFF0) == 0)
+            if ((id & 0xFFFFFFF0) == 0)
             {
                 oldVal = _fastAccessConsumers[id];
                 _fastAccessConsumers[id] = consumer;
@@ -144,13 +143,12 @@
 
         }
 
-
         public BasicMessageConsumer remove(int id)
         {
             BasicMessageConsumer consumer;
-            if((id & 0xFFFFFFF0) == 0)
+            if ((id & 0xFFFFFFF0) == 0)
             {
-                 consumer = _fastAccessConsumers[id];
+                consumer = _fastAccessConsumers[id];
                 _fastAccessConsumers[id] = null;
             }
             else
@@ -166,9 +164,9 @@
         {
             ArrayList<BasicMessageConsumer> values = new ArrayList<BasicMessageConsumer>();
 
-            for(int i = 0; i < 16; i++)
+            for (int i = 0; i < 16; i++)
             {
-                if(_fastAccessConsumers[i] != null)
+                if (_fastAccessConsumers[i] != null)
                 {
                     values.add(_fastAccessConsumers[i]);
                 }
@@ -178,11 +176,10 @@
             return values;
         }
 
-
         public void clear()
         {
             _slowAccessConsumers.clear();
-            for(int i = 0; i<16; i++)
+            for (int i = 0; i < 16; i++)
             {
                 _fastAccessConsumers[i] = null;
             }
@@ -280,19 +277,13 @@
      */
     protected final FlowControllingBlockingQueue _queue;
 
-    /**
-     * Holds the highest received delivery tag.
-     */
+    /** Holds the highest received delivery tag. */
     private final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
 
-    /**
-     * All the not yet acknowledged message tags
-     */
+    /** All the not yet acknowledged message tags */
     protected ConcurrentLinkedQueue<Long> _unacknowledgedMessageTags = new ConcurrentLinkedQueue<Long>();
 
-    /**
-     * All the delivered message tags
-     */
+    /** All the delivered message tags */
     protected ConcurrentLinkedQueue<Long> _deliveredMessageTags = new ConcurrentLinkedQueue<Long>();
 
     /** Holds the dispatcher thread for this session. */
@@ -315,9 +306,9 @@
      * consumer.
      */
     protected final IdToConsumerMap _consumers = new IdToConsumerMap();
-    
-            //Map<AMQShortString, BasicMessageConsumer> _consumers =
-            //new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
+
+    //Map<AMQShortString, BasicMessageConsumer> _consumers =
+    //new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
 
     /**
      * Contains a list of consumers which have been removed but which might still have
@@ -380,15 +371,13 @@
     /** Has failover occured on this session */
     private boolean _failedOver;
 
-
-
     private static final class FlowControlIndicator
     {
         private volatile boolean _flowControl = true;
 
         public synchronized void setFlowControl(boolean flowControl)
         {
-            _flowControl= flowControl;
+            _flowControl = flowControl;
             notify();
         }
 
@@ -450,8 +439,8 @@
                                                          public void aboveThreshold(int currentValue)
                                                          {
                                                              _logger.debug(
-                                                                 "Above threshold(" + _defaultPrefetchHighMark
-                                                                 + ") so suspending channel. Current value is " + currentValue);
+                                                                     "Above threshold(" + _defaultPrefetchHighMark
+                                                                     + ") so suspending channel. Current value is " + currentValue);
                                                              _suspendState.set(true);
                                                              new Thread(new SuspenderRunner(_suspendState)).start();
 
@@ -460,8 +449,8 @@
                                                          public void underThreshold(int currentValue)
                                                          {
                                                              _logger.debug(
-                                                                         "Below threshold(" + _defaultPrefetchLowMark
-                                                                         + ") so unsuspending channel. Current value is " + currentValue);
+                                                                     "Below threshold(" + _defaultPrefetchLowMark
+                                                                     + ") so unsuspending channel. Current value is " + currentValue);
                                                              _suspendState.set(false);
                                                              new Thread(new SuspenderRunner(_suspendState)).start();
 
@@ -503,6 +492,19 @@
         close(-1);
     }
 
+    public void checkNotClosed() throws JMSException
+    {
+        // if the Connection has closed then we should throw any exception that has occured that we were not waiting for
+        AMQStateManager manager = _connection.getProtocolHandler().getStateManager();
+        if (isClosed() && manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED) && manager.getLastException() != null)
+        {
+            JMSException jmse = new IllegalStateException("Object " + toString() + " has been closed");
+            jmse.setLinkedException(manager.getLastException());
+            throw jmse;
+        }
+        super.checkNotClosed();
+    }
+
     public BytesMessage createBytesMessage() throws JMSException
     {
         checkNotClosed();
@@ -519,7 +521,7 @@
         if (isClosed())
         {
             throw new IllegalStateException("Session is already closed");
-        } 
+        }
         else if (hasFailedOver())
         {
             throw new IllegalStateException("has failed over");
@@ -564,39 +566,35 @@
      * @param exchangeName The exchange to bind the queue on.
      *
      * @throws AMQException If the queue cannot be bound for any reason.
-     *
      * @todo Be aware of possible changes to parameter order as versions change.
-     *
      * @todo Document the additional arguments that may be passed in the field table. Are these for headers exchanges?
      */
     public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
-                          final AMQShortString exchangeName,final AMQDestination destination) throws AMQException
+                          final AMQShortString exchangeName, final AMQDestination destination) throws AMQException
     {
         /*new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()*/
         new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
         {
             public Object execute() throws AMQException, FailoverException
             {
-                sendQueueBind(queueName,routingKey,arguments,exchangeName,destination);
+                sendQueueBind(queueName, routingKey, arguments, exchangeName, destination);
                 return null;
             }
         }, _connection).execute();
     }
 
-
     public void addBindingKey(BasicMessageConsumer consumer, AMQDestination amqd, String routingKey) throws AMQException
     {
-        if( consumer.getQueuename() != null)
+        if (consumer.getQueuename() != null)
         {
-            bindQueue(consumer.getQueuename(), new AMQShortString(routingKey), new FieldTable(), amqd.getExchangeName(),amqd);
+            bindQueue(consumer.getQueuename(), new AMQShortString(routingKey), new FieldTable(), amqd.getExchangeName(), amqd);
         }
     }
 
     public abstract void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
-            final AMQShortString exchangeName,AMQDestination destination) throws AMQException, FailoverException;
+                                       final AMQShortString exchangeName, AMQDestination destination) throws AMQException, FailoverException;
 
     /**
-
      * Closes the session.
      *
      * <p/>Note that this operation succeeds automatically if a fail-over interupts the sycnronous request to close
@@ -606,14 +604,11 @@
      * @param timeout The timeout in milliseconds to wait for the session close acknoledgement from the broker.
      *
      * @throws JMSException If the JMS provider fails to close the session due to some internal error.
-     *
      * @todo Be aware of possible changes to parameter order as versions change.
-     *
      * @todo Not certain about the logic of ignoring the failover exception, because the channel won't be
-     *       re-opened. May need to examine this more carefully.
-     *
+     * re-opened. May need to examine this more carefully.
      * @todo Note that taking the failover mutex doesn't prevent this operation being interrupted by a failover,
-     *       because the failover process sends the failover event before acquiring the mutex itself.
+     * because the failover process sends the failover event before acquiring the mutex itself.
      */
     public void close(long timeout) throws JMSException
     {
@@ -621,7 +616,7 @@
         {
             StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
             _logger.info("Closing session: " + this); // + ":"
-                         // + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
+            // + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
         }
 
         // Ensure we only try and close an open session.
@@ -638,7 +633,7 @@
 
                     try
                     {
-                       sendClose(timeout);
+                        sendClose(timeout);
                     }
                     catch (AMQException e)
                     {
@@ -705,7 +700,6 @@
                         amqe = new AMQException("Closing session forcibly", e);
                     }
 
-
                     _connection.deregisterSession(_channelId);
                     closeProducersAndConsumers(amqe);
                 }
@@ -723,12 +717,11 @@
      * @throws JMSException If the JMS provider fails to commit the transaction due to some internal error. This does
      *                      not mean that the commit is known to have failed, merely that it is not known whether it
      *                      failed or not.
-     *
      * @todo Be aware of possible changes to parameter order as versions change.
      */
     public void commit() throws JMSException
     {
-    	checkTransacted();
+        checkTransacted();
 
         try
         {
@@ -792,10 +785,8 @@
 
                 // consumer.markClosed();
 
-
-
                 if (consumer.isAutoClose())
-                {     
+                {
                     // There is a small window where the message is between the two queues in the dispatcher.
                     if (consumer.isClosed())
                     {
@@ -863,7 +854,6 @@
                                   false, false);
     }
 
-
     public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
     {
         checkValidDestination(destination);
@@ -881,7 +871,6 @@
                                   messageSelector, null, false, false);
     }
 
-
     public MessageConsumer createExclusiveConsumer(Destination destination, String messageSelector, boolean noLocal)
             throws JMSException
     {
@@ -891,7 +880,6 @@
                                   messageSelector, null, false, false);
     }
 
-
     public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
                                           String selector) throws JMSException
     {
@@ -928,7 +916,7 @@
     public abstract TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException;
 
     public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
-        throws JMSException
+            throws JMSException
     {
         checkNotClosed();
         checkValidTopic(topic);
@@ -996,7 +984,7 @@
     {
         checkNotClosed();
 
-        return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic,false,false), topic);
+        return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic, false, false), topic);
     }
 
     public Queue createQueue(String queueName) throws JMSException
@@ -1022,7 +1010,6 @@
         }
     }
 
-
     /**
      * Declares the named queue.
      *
@@ -1034,7 +1021,6 @@
      * @param exclusive  Flag to indicate that the queue is exclusive to this client.
      *
      * @throws AMQException If the queue cannot be declared for any reason.
-     *
      * @todo Be aware of possible changes to parameter order as versions change.
      */
     public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable,
@@ -1043,7 +1029,6 @@
         createQueue(name, autoDelete, durable, exclusive, null);
     }
 
-
     /**
      * Declares the named queue.
      *
@@ -1056,7 +1041,6 @@
      * @param arguments  Arguments used to set special properties of the queue
      *
      * @throws AMQException If the queue cannot be declared for any reason.
-     *
      * @todo Be aware of possible changes to parameter order as versions change.
      */
     public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable,
@@ -1073,7 +1057,8 @@
     }
 
     public abstract void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable,
-            final boolean exclusive, final Map<String, Object> arguments)throws AMQException, FailoverException;
+                                         final boolean exclusive, final Map<String, Object> arguments) throws AMQException, FailoverException;
+
     /**
      * Creates a QueueReceiver
      *
@@ -1191,7 +1176,6 @@
         return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest));
     }
 
-
     /**
      * Creates a non-durable subscriber with a message selector
      *
@@ -1382,7 +1366,7 @@
         if (message instanceof ReturnMessage)
         {
             // Return of the bounced message.
-            returnBouncedMessage((ReturnMessage)message);
+            returnBouncedMessage((ReturnMessage) message);
         }
         else
         {
@@ -1398,7 +1382,7 @@
         AMQProtocolHandler protocolHandler = getProtocolHandler();
         declareExchange(amqd, protocolHandler, false);
         AMQShortString queueName = declareQueue(amqd, protocolHandler, false);
-        bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(),amqd);
+        bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(), amqd);
     }
 
     /**
@@ -1413,7 +1397,7 @@
      * <li>Stop message delivery.</li>
      * <li>Mark all messages that might have been delivered but not acknowledged as "redelivered".
      * <li>Restart the delivery sequence including all unacknowledged messages that had been previously delivered.
-     *     Redelivered messages do not have to be delivered in exactly their original delivery order.</li>
+     * Redelivered messages do not have to be delivered in exactly their original delivery order.</li>
      * </ul>
      *
      * <p/>If the recover operation is interrupted by a fail-over, between asking that the broker begin recovery and
@@ -1503,7 +1487,6 @@
      * @throws JMSException If the JMS provider fails to rollback the transaction due to some internal error. This does
      *                      not mean that the rollback is known to have failed, merely that it is not known whether it
      *                      failed or not.
-     *
      * @todo Be aware of possible changes to parameter order as versions change.
      */
     public void rollback() throws JMSException
@@ -1545,8 +1528,7 @@
 
     public abstract void releaseForRollback();
 
-    public abstract void  sendRollback() throws AMQException, FailoverException ;
-
+    public abstract void sendRollback() throws AMQException, FailoverException;
 
     public void run()
     {
@@ -1673,8 +1655,8 @@
                             ft.addAll(rawSelector);
                         }
 
-                        BasicMessageConsumer consumer = createMessageConsumer(amqd, prefetchHigh,prefetchLow,
-                                noLocal,exclusive, messageSelector, ft, noConsume, autoClose);
+                        BasicMessageConsumer consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow,
+                                                                              noLocal, exclusive, messageSelector, ft, noConsume, autoClose);
 
                         if (_messageListener != null)
                         {
@@ -1718,8 +1700,8 @@
     }
 
     public abstract BasicMessageConsumer createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
-            final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector,
-            final boolean noConsume, final boolean autoClose) throws JMSException;
+                                                               final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector,
+                                                               final boolean noConsume, final boolean autoClose) throws JMSException;
 
     /**
      * Called by the MessageConsumer when closing, to deregister the consumer from the map from consumerTag to consumer
@@ -1782,12 +1764,11 @@
      * @return <tt>true</tt> if the queue is bound to the exchange and routing key, <tt>false</tt> if not.
      *
      * @throws JMSException If the query fails for any reason.
-     *
      * @todo Be aware of possible changes to parameter order as versions change.
      */
     public abstract boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
             throws JMSException;
-            
+
     public abstract boolean isQueueBound(final AMQDestination destination) throws JMSException;
 
     /**
@@ -1828,10 +1809,9 @@
      * Starts the session, which ensures that it is not suspended and that its event dispatcher is running.
      *
      * @throws AMQException If the session cannot be started for any reason.
-     *
      * @todo This should be controlled by _stopped as it pairs with the stop method fixme or check the
-     *       FlowControlledBlockingQueue _queue to see if we have flow controlled. will result in sending Flow messages
-     *       for each subsequent call to flow.. only need to do this if we have called stop.
+     * FlowControlledBlockingQueue _queue to see if we have flow controlled. will result in sending Flow messages
+     * for each subsequent call to flow.. only need to do this if we have called stop.
      */
     void start() throws AMQException
     {
@@ -2032,7 +2012,7 @@
             }
         }
         // at this point the _consumers map will be empty
-         if (_dispatcher != null)
+        if (_dispatcher != null)
         {
             _dispatcher.close();
             _dispatcher = null;
@@ -2124,7 +2104,7 @@
     }
 
     public abstract void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName,
-            AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector,AMQShortString tag) throws AMQException, FailoverException;
+                                     AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, AMQShortString tag) throws AMQException, FailoverException;
 
     private BasicMessageProducer createProducerImpl(Destination destination, boolean mandatory, boolean immediate)
             throws JMSException
@@ -2143,7 +2123,7 @@
                         checkNotClosed();
                         long producerId = getNextProducerId();
                         BasicMessageProducer producer = createMessageProducer(destination, mandatory,
-                                immediate, waitUntilSent, producerId);
+                                                                              immediate, waitUntilSent, producerId);
                         registerProducer(producerId, producer);
 
                         return producer;
@@ -2152,20 +2132,19 @@
     }
 
     public abstract BasicMessageProducer createMessageProducer(final Destination destination, final boolean mandatory,
-            final boolean immediate, final boolean waitUntilSent, long producerId);
+                                                               final boolean immediate, final boolean waitUntilSent, long producerId);
 
     private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException
     {
         declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait);
     }
 
-
     /**
      * Returns the number of messages currently queued for the given destination.
      *
      * <p/>Note that this operation automatically retries in the event of fail-over.
      *
-     * @param amqd            The destination to be checked
+     * @param amqd The destination to be checked
      *
      * @return the number of queued messages.
      *
@@ -2198,7 +2177,6 @@
      * @param nowait
      *
      * @throws AMQException If the exchange cannot be declared for any reason.
-     *
      * @todo Be aware of possible changes to parameter order as versions change.
      */
     private void declareExchange(final AMQShortString name, final AMQShortString type,
@@ -2215,8 +2193,7 @@
     }
 
     public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler,
-            final boolean nowait) throws AMQException, FailoverException;
-
+                                             final boolean nowait) throws AMQException, FailoverException;
 
     /**
      * Declares a queue for a JMS destination.
@@ -2234,9 +2211,7 @@
      *         the client.
      *
      * @throws AMQException If the queue cannot be declared for any reason.
-     *
      * @todo Verify the destiation is valid or throw an exception.
-     *
      * @todo Be aware of possible changes to parameter order as versions change.
      */
     protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
@@ -2262,7 +2237,7 @@
                 }, _connection).execute();
     }
 
-    public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler)throws AMQException, FailoverException;
+    public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) throws AMQException, FailoverException;
 
     /**
      * Undeclares the specified queue.
@@ -2272,7 +2247,6 @@
      * @param queueName The name of the queue to delete.
      *
      * @throws JMSException If the queue could not be deleted for any reason.
-     *
      * @todo Be aware of possible changes to parameter order as versions change.
      */
     protected void deleteQueue(final AMQShortString queueName) throws JMSException
@@ -2294,7 +2268,7 @@
         }
     }
 
-    public abstract void sendQueueDelete(final AMQShortString queueName)  throws AMQException, FailoverException;
+    public abstract void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException;
 
     private long getNextProducerId()
     {
@@ -2384,7 +2358,7 @@
         consumer.setQueuename(queueName);
 
         // bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
-        bindQueue(queueName, amqd.getRoutingKey(), consumer.getRawSelectorFieldTable(), amqd.getExchangeName(),amqd);
+        bindQueue(queueName, amqd.getRoutingKey(), consumer.getRawSelectorFieldTable(), amqd.getExchangeName(), amqd);
 
         // If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch
         if (!_immediatePrefetch)
@@ -2469,7 +2443,7 @@
                 if (_logger.isDebugEnabled())
                 {
                     _logger.debug("Removing message(" + System.identityHashCode(message) + ") from _queue DT:"
-                        + message.getDeliveryTag());
+                                  + message.getDeliveryTag());
                 }
 
                 messages.remove();
@@ -2519,10 +2493,10 @@
                     // Bounced message is processed here, away from the mina thread
                     AbstractJMSMessage bouncedMessage =
                             _messageFactoryRegistry.createMessage(0, false, msg.getExchange(),
-                            		msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies());
-                        AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode());
-                        AMQShortString reason = msg.getReplyText();
-                        _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
+                                                                  msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies());
+                    AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode());
+                    AMQShortString reason = msg.getReplyText();
+                    _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
 
                     // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
                     if (errorCode == AMQConstant.NO_CONSUMERS)
@@ -2557,7 +2531,6 @@
      *                should be unsuspended.
      *
      * @throws AMQException If the session cannot be suspended for any reason.
-     *
      * @todo Be aware of possible changes to parameter order as versions change.
      */
     protected void suspendChannel(boolean suspend) throws AMQException // , FailoverException
@@ -2598,7 +2571,6 @@
         return getAMQConnection().getMaxPrefetch() > 0;
     }
 
-
     /** Signifies that the session has pending sends to commit. */
     public void markDirty()
     {
@@ -2642,12 +2614,11 @@
         _flowControl.setFlowControl(active);
     }
 
-
     public void checkFlowControl() throws InterruptedException
     {
-        synchronized(_flowControl)
+        synchronized (_flowControl)
         {
-            while(!_flowControl.getFlowControl())
+            while (!_flowControl.getFlowControl())
             {
                 _flowControl.wait();
             }
@@ -2655,7 +2626,6 @@
 
     }
 
-
     /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
     class Dispatcher extends Thread
     {
@@ -2856,7 +2826,7 @@
             //if (message.getDeliverBody() != null)
             //{
             final BasicMessageConsumer consumer =
-                _consumers.get(message.getConsumerTag().toIntValue());
+                    _consumers.get(message.getConsumerTag().toIntValue());
 
             if ((consumer == null) || consumer.isClosed())
             {
@@ -2865,14 +2835,14 @@
                     if (consumer == null)
                     {
                         _dispatcherLogger.info("Dispatcher(" + dispatcherID + ")Received a message(" + System.identityHashCode(message) + ")" + "["
-                                + message.getDeliveryTag() + "] from queue "
-                                + message.getConsumerTag() + " )without a handler - rejecting(requeue)...");
+                                               + message.getDeliveryTag() + "] from queue "
+                                               + message.getConsumerTag() + " )without a handler - rejecting(requeue)...");
                     }
                     else
                     {
                         _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "["
-                                + message.getDeliveryTag() + "] from queue " + " consumer("
-                                + message.getConsumerTag() + ") is closed rejecting(requeue)...");
+                                               + message.getDeliveryTag() + "] from queue " + " consumer("
+                                               + message.getConsumerTag() + ") is closed rejecting(requeue)...");
                     }
                 }
                 // Don't reject if we're already closing
@@ -2930,7 +2900,7 @@
         {
             try
             {
-                synchronized(_suspensionLock)
+                synchronized (_suspensionLock)
                 {
                     suspendChannel(_suspend.get());
                 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=676978&r1=676977&r2=676978&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Tue Jul 15 10:06:16 2008
@@ -482,7 +482,7 @@
                                                        false,
                                                        null).generateFrame(_channelId);
         QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler();
-        getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler);
+        getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler);        
         return okHandler._messageCount;
     }
 

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java?rev=676978&r1=676977&r2=676978&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java Tue Jul 15 10:06:16 2008
@@ -139,11 +139,15 @@
             // have a state waiter waiting until the connection is closed for some reason. Or in future we may have
             // a slightly more complex state model therefore I felt it was worthwhile doing this.
             AMQStateManager existingStateManager = _amqProtocolHandler.getStateManager();
-            _amqProtocolHandler.setStateManager(new AMQStateManager(_amqProtocolHandler.getProtocolSession()));
+
+            _amqProtocolHandler.setStateManager(new AMQStateManager());
+
+
             if (!_amqProtocolHandler.getConnection().firePreFailover(_host != null))
             {
                 _logger.info("Failover process veto-ed by client");
 
+                //Restore Existing State Manager
                 _amqProtocolHandler.setStateManager(existingStateManager);
 
                 //todo: ritchiem these exceptions are useless... Would be better to attempt to propogate exception that
@@ -181,13 +185,19 @@
 
             if (!failoverSucceeded)
             {
+                //Restore Existing State Manager
                 _amqProtocolHandler.setStateManager(existingStateManager);
+
                 _amqProtocolHandler.getConnection().exceptionReceived(
                         new AMQDisconnectedException("Server closed connection and no failover " +
                                 "was successful", null));
             }
             else
             {
+                // Set the new Protocol Session in the StateManager.               
+                existingStateManager.setProtocolSession(_amqProtocolHandler.getProtocolSession());
+
+                //Restore Existing State Manager
                 _amqProtocolHandler.setStateManager(existingStateManager);
                 try
                 {

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java?rev=676978&r1=676977&r2=676978&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java Tue Jul 15 10:06:16 2008
@@ -5,14 +5,8 @@
 
 import org.apache.qpid.framing.*;
 import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.AMQNoConsumersException;
-import org.apache.qpid.client.AMQNoRouteException;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQInvalidRoutingKeyException;
-import org.apache.qpid.AMQChannelClosedException;
-import org.apache.qpid.protocol.AMQConstant;
 
 public class AccessRequestOkMethodHandler implements StateAwareMethodListener<AccessRequestOkBody>
 {
@@ -25,11 +19,10 @@
         return _handler;
     }
 
-    public void methodReceived(AMQStateManager stateManager, AccessRequestOkBody method, int channelId)
+    public void methodReceived(AMQProtocolSession session, AccessRequestOkBody method, int channelId)
         throws AMQException
     {
         _logger.debug("AccessRequestOk method received");
-        final AMQProtocolSession session = stateManager.getProtocolSession();
         session.setTicket(method.getTicket(), channelId);
 
     }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java?rev=676978&r1=676977&r2=676978&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java Tue Jul 15 10:06:16 2008
@@ -22,11 +22,8 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
 import org.apache.qpid.framing.BasicCancelOkBody;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,13 +42,9 @@
     private BasicCancelOkMethodHandler()
     { }
 
-    public void methodReceived(AMQStateManager stateManager, BasicCancelOkBody body, int channelId)
+    public void methodReceived(AMQProtocolSession session, BasicCancelOkBody body, int channelId)
         throws AMQException
     {
-        AMQProtocolSession session = stateManager.getProtocolSession();
-
-
-
         if (_logger.isInfoEnabled())
         {
             _logger.info("New BasicCancelOk method received for consumer:" + body.getConsumerTag());

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java?rev=676978&r1=676977&r2=676978&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java Tue Jul 15 10:06:16 2008
@@ -21,10 +21,8 @@
 package org.apache.qpid.client.handler;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.client.message.UnprocessedMessage;
 import org.apache.qpid.client.message.UnprocessedMessage_0_8;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
 import org.apache.qpid.framing.BasicDeliverBody;
 import org.slf4j.Logger;
@@ -41,10 +39,9 @@
         return _instance;
     }
 
-    public void methodReceived(AMQStateManager stateManager, BasicDeliverBody body, int channelId)
-        throws AMQException
+    public void methodReceived(AMQProtocolSession session, BasicDeliverBody body, int channelId)
+            throws AMQException
     {
-        final AMQProtocolSession session = stateManager.getProtocolSession();
         final UnprocessedMessage_0_8 msg = new UnprocessedMessage_0_8(
                 channelId,
                 body.getDeliveryTag(),
@@ -52,7 +49,7 @@
                 body.getExchange(),
                 body.getRoutingKey(),
                 body.getRedelivered());
-        _logger.debug("New JmsDeliver method received");
+        _logger.debug("New JmsDeliver method received:" + session);
         session.unprocessedMessageReceived(msg);
     }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java?rev=676978&r1=676977&r2=676978&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java Tue Jul 15 10:06:16 2008
@@ -22,13 +22,9 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.message.ReturnMessage;
-import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.message.UnprocessedMessage_0_8;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
 import org.apache.qpid.framing.BasicReturnBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,11 +41,10 @@
     }
 
 
-    public void methodReceived(AMQStateManager stateManager, BasicReturnBody body, int channelId)
+    public void methodReceived(AMQProtocolSession session, BasicReturnBody body, int channelId)
     throws AMQException
     {
         _logger.debug("New JmsBounce method received");
-        final AMQProtocolSession session = stateManager.getProtocolSession();
         final ReturnMessage msg = new ReturnMessage(channelId,
                 body.getExchange(),
                 body.getRoutingKey(),

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java?rev=676978&r1=676977&r2=676978&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java Tue Jul 15 10:06:16 2008
@@ -26,14 +26,12 @@
 import org.apache.qpid.client.AMQNoConsumersException;
 import org.apache.qpid.client.AMQNoRouteException;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ChannelCloseBody;
 import org.apache.qpid.framing.ChannelCloseOkBody;
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.AMQMethodEvent;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,12 +47,10 @@
         return _handler;
     }
 
-    public void methodReceived(AMQStateManager stateManager, ChannelCloseBody method, int channelId)
+    public void methodReceived(AMQProtocolSession session, ChannelCloseBody method, int channelId)
         throws AMQException
     {
         _logger.debug("ChannelClose method received");
-        final AMQProtocolSession session = stateManager.getProtocolSession();
-
 
         AMQConstant errorCode = AMQConstant.getConstant(method.getReplyCode());
         AMQShortString reason = method.getReplyText();

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java?rev=676978&r1=676977&r2=676978&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java Tue Jul 15 10:06:16 2008
@@ -23,9 +23,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.ChannelCloseOkBody;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.protocol.AMQMethodEvent;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,12 +39,11 @@
         return _instance;
     }
 
-    public void methodReceived(AMQStateManager stateManager,  ChannelCloseOkBody method, int channelId)
+    public void methodReceived(AMQProtocolSession session,  ChannelCloseOkBody method, int channelId)
         throws AMQException
     {
         _logger.info("Received channel-close-ok for channel-id " + channelId);
 
-        final AMQProtocolSession session = stateManager.getProtocolSession();
         // todo this should do the local closure
     }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java?rev=676978&r1=676977&r2=676978&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java Tue Jul 15 10:06:16 2008
@@ -2,7 +2,6 @@
 
 import org.apache.qpid.framing.ChannelFlowBody;
 import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.AMQException;
 import org.slf4j.Logger;
@@ -42,11 +41,9 @@
     private ChannelFlowMethodHandler()
     { }
 
-    public void methodReceived(AMQStateManager stateManager, ChannelFlowBody body, int channelId)
+    public void methodReceived(AMQProtocolSession session, ChannelFlowBody body, int channelId)
             throws AMQException
     {
-
-        final AMQProtocolSession session = stateManager.getProtocolSession();
         session.setFlowControl(channelId, body.getActive());
     }
 

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java?rev=676978&r1=676977&r2=676978&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java Tue Jul 15 10:06:16 2008
@@ -22,10 +22,8 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
 import org.apache.qpid.framing.ChannelFlowOkBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,7 +41,7 @@
     private ChannelFlowOkMethodHandler()
     { }
 
-    public void methodReceived(AMQStateManager stateManager, ChannelFlowOkBody body, int channelId)
+    public void methodReceived(AMQProtocolSession session, ChannelFlowOkBody body, int channelId)
             throws AMQException
     {
 

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java?rev=676978&r1=676977&r2=676978&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java Tue Jul 15 10:06:16 2008
@@ -27,31 +27,33 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.AMQMethodNotImplementedException;
-
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ClientMethodDispatcherImpl implements MethodDispatcher
 {
 
-
-    private static final BasicCancelOkMethodHandler      _basicCancelOkMethodHandler      = BasicCancelOkMethodHandler.getInstance();
-    private static final BasicDeliverMethodHandler       _basicDeliverMethodHandler       = BasicDeliverMethodHandler.getInstance();
-    private static final BasicReturnMethodHandler        _basicReturnMethodHandler        = BasicReturnMethodHandler.getInstance();
-    private static final ChannelCloseMethodHandler       _channelCloseMethodHandler       = ChannelCloseMethodHandler.getInstance();
-    private static final ChannelFlowOkMethodHandler      _channelFlowOkMethodHandler      = ChannelFlowOkMethodHandler.getInstance();
-    private static final ConnectionCloseMethodHandler    _connectionCloseMethodHandler    = ConnectionCloseMethodHandler.getInstance();
-    private static final ConnectionOpenOkMethodHandler   _connectionOpenOkMethodHandler   = ConnectionOpenOkMethodHandler.getInstance();
+    private static final BasicCancelOkMethodHandler _basicCancelOkMethodHandler = BasicCancelOkMethodHandler.getInstance();
+    private static final BasicDeliverMethodHandler _basicDeliverMethodHandler = BasicDeliverMethodHandler.getInstance();
+    private static final BasicReturnMethodHandler _basicReturnMethodHandler = BasicReturnMethodHandler.getInstance();
+    private static final ChannelCloseMethodHandler _channelCloseMethodHandler = ChannelCloseMethodHandler.getInstance();
+    private static final ChannelFlowOkMethodHandler _channelFlowOkMethodHandler = ChannelFlowOkMethodHandler.getInstance();
+    private static final ConnectionCloseMethodHandler _connectionCloseMethodHandler = ConnectionCloseMethodHandler.getInstance();
+    private static final ConnectionOpenOkMethodHandler _connectionOpenOkMethodHandler = ConnectionOpenOkMethodHandler.getInstance();
     private static final ConnectionRedirectMethodHandler _connectionRedirectMethodHandler = ConnectionRedirectMethodHandler.getInstance();
-    private static final ConnectionSecureMethodHandler   _connectionSecureMethodHandler   = ConnectionSecureMethodHandler.getInstance();
-    private static final ConnectionStartMethodHandler    _connectionStartMethodHandler    = ConnectionStartMethodHandler.getInstance();
-    private static final ConnectionTuneMethodHandler     _connectionTuneMethodHandler     = ConnectionTuneMethodHandler.getInstance();
-    private static final ExchangeBoundOkMethodHandler    _exchangeBoundOkMethodHandler    = ExchangeBoundOkMethodHandler.getInstance();
-    private static final QueueDeleteOkMethodHandler      _queueDeleteOkMethodHandler      = QueueDeleteOkMethodHandler.getInstance();
+    private static final ConnectionSecureMethodHandler _connectionSecureMethodHandler = ConnectionSecureMethodHandler.getInstance();
+    private static final ConnectionStartMethodHandler _connectionStartMethodHandler = ConnectionStartMethodHandler.getInstance();
+    private static final ConnectionTuneMethodHandler _connectionTuneMethodHandler = ConnectionTuneMethodHandler.getInstance();
+    private static final ExchangeBoundOkMethodHandler _exchangeBoundOkMethodHandler = ExchangeBoundOkMethodHandler.getInstance();
+    private static final QueueDeleteOkMethodHandler _queueDeleteOkMethodHandler = QueueDeleteOkMethodHandler.getInstance();
 
+    private static final Logger _logger = LoggerFactory.getLogger(ClientMethodDispatcherImpl.class);
 
 
     private static interface DispatcherFactory
     {
-        public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager);
+        public ClientMethodDispatcherImpl createMethodDispatcher(AMQProtocolSession session);
     }
 
     private static final Map<ProtocolVersion, DispatcherFactory> _dispatcherFactories =
@@ -62,44 +64,40 @@
         _dispatcherFactories.put(ProtocolVersion.v8_0,
                                  new DispatcherFactory()
                                  {
-                                     public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager)
+                                     public ClientMethodDispatcherImpl createMethodDispatcher(AMQProtocolSession session)
                                      {
-                                         return new ClientMethodDispatcherImpl_8_0(stateManager);
+                                         return new ClientMethodDispatcherImpl_8_0(session);
                                      }
                                  });
 
         _dispatcherFactories.put(ProtocolVersion.v0_9,
                                  new DispatcherFactory()
                                  {
-                                     public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager)
+                                     public ClientMethodDispatcherImpl createMethodDispatcher(AMQProtocolSession session)
                                      {
-                                         return new ClientMethodDispatcherImpl_0_9(stateManager);
+                                         return new ClientMethodDispatcherImpl_0_9(session);
                                      }
                                  });
 
     }
 
-
-    public static ClientMethodDispatcherImpl newMethodDispatcher(ProtocolVersion version, AMQStateManager stateManager)
+    public static ClientMethodDispatcherImpl newMethodDispatcher(ProtocolVersion version, AMQProtocolSession session)
     {
+        _logger.error("New Method Dispatcher:" + session);
         DispatcherFactory factory = _dispatcherFactories.get(version);
-        return factory.createMethodDispatcher(stateManager);
+        return factory.createMethodDispatcher(session);
     }
-    
-
 
+    AMQProtocolSession _session;
 
-    private AMQStateManager _stateManager;
-
-    public ClientMethodDispatcherImpl(AMQStateManager stateManager)
+    public ClientMethodDispatcherImpl(AMQProtocolSession session)
     {
-        _stateManager = stateManager;
+        _session = session;
     }
 
-
     public AMQStateManager getStateManager()
     {
-        return _stateManager;
+        return _session.getStateManager();
     }
 
     public boolean dispatchAccessRequestOk(AccessRequestOkBody body, int channelId) throws AMQException
@@ -109,7 +107,7 @@
 
     public boolean dispatchBasicCancelOk(BasicCancelOkBody body, int channelId) throws AMQException
     {
-        _basicCancelOkMethodHandler.methodReceived(_stateManager,body,channelId);
+        _basicCancelOkMethodHandler.methodReceived(_session, body, channelId);
         return true;
     }
 
@@ -120,7 +118,7 @@
 
     public boolean dispatchBasicDeliver(BasicDeliverBody body, int channelId) throws AMQException
     {
-        _basicDeliverMethodHandler.methodReceived(_stateManager,body,channelId);
+        _basicDeliverMethodHandler.methodReceived(_session, body, channelId);
         return true;
     }
 
@@ -141,13 +139,13 @@
 
     public boolean dispatchBasicReturn(BasicReturnBody body, int channelId) throws AMQException
     {
-        _basicReturnMethodHandler.methodReceived(_stateManager,body,channelId);
+        _basicReturnMethodHandler.methodReceived(_session, body, channelId);
         return true;
     }
 
     public boolean dispatchChannelClose(ChannelCloseBody body, int channelId) throws AMQException
     {
-        _channelCloseMethodHandler.methodReceived(_stateManager,body,channelId);
+        _channelCloseMethodHandler.methodReceived(_session, body, channelId);
         return true;
     }
 
@@ -163,7 +161,7 @@
 
     public boolean dispatchChannelFlowOk(ChannelFlowOkBody body, int channelId) throws AMQException
     {
-        _channelFlowOkMethodHandler.methodReceived(_stateManager,body,channelId);
+        _channelFlowOkMethodHandler.methodReceived(_session, body, channelId);
         return true;
     }
 
@@ -174,7 +172,7 @@
 
     public boolean dispatchConnectionClose(ConnectionCloseBody body, int channelId) throws AMQException
     {
-        _connectionCloseMethodHandler.methodReceived(_stateManager,body,channelId);
+        _connectionCloseMethodHandler.methodReceived(_session, body, channelId);
         return true;
     }
 
@@ -185,37 +183,37 @@
 
     public boolean dispatchConnectionOpenOk(ConnectionOpenOkBody body, int channelId) throws AMQException
     {
-        _connectionOpenOkMethodHandler.methodReceived(_stateManager,body,channelId);
+        _connectionOpenOkMethodHandler.methodReceived(_session, body, channelId);
         return true;
     }
 
     public boolean dispatchConnectionRedirect(ConnectionRedirectBody body, int channelId) throws AMQException
     {
-        _connectionRedirectMethodHandler.methodReceived(_stateManager,body,channelId);
+        _connectionRedirectMethodHandler.methodReceived(_session, body, channelId);
         return true;
     }
 
     public boolean dispatchConnectionSecure(ConnectionSecureBody body, int channelId) throws AMQException
     {
-        _connectionSecureMethodHandler.methodReceived(_stateManager,body,channelId);
+        _connectionSecureMethodHandler.methodReceived(_session, body, channelId);
         return true;
     }
 
     public boolean dispatchConnectionStart(ConnectionStartBody body, int channelId) throws AMQException
     {
-        _connectionStartMethodHandler.methodReceived(_stateManager,body,channelId);
+        _connectionStartMethodHandler.methodReceived(_session, body, channelId);
         return true;
     }
 
     public boolean dispatchConnectionTune(ConnectionTuneBody body, int channelId) throws AMQException
     {
-        _connectionTuneMethodHandler.methodReceived(_stateManager,body,channelId);
+        _connectionTuneMethodHandler.methodReceived(_session, body, channelId);
         return true;
     }
 
     public boolean dispatchQueueDeleteOk(QueueDeleteOkBody body, int channelId) throws AMQException
     {
-        _queueDeleteOkMethodHandler.methodReceived(_stateManager,body,channelId);
+        _queueDeleteOkMethodHandler.methodReceived(_session, body, channelId);
         return true;
     }
 
@@ -431,7 +429,7 @@
 
     public boolean dispatchExchangeBoundOk(ExchangeBoundOkBody body, int channelId) throws AMQException
     {
-        _exchangeBoundOkMethodHandler.methodReceived(_stateManager,body,channelId);
+        _exchangeBoundOkMethodHandler.methodReceived(_session, body, channelId);
         return true;
     }
 
@@ -522,7 +520,7 @@
 
     public boolean dispatchTxSelectOk(TxSelectOkBody body, int channelId) throws AMQException
     {
-        return false;  
+        return false;
     }
 
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java?rev=676978&r1=676977&r2=676978&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java Tue Jul 15 10:06:16 2008
@@ -26,16 +26,15 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.AMQMethodNotImplementedException;
-
+import org.apache.qpid.client.protocol.AMQProtocolSession;
 
 public class ClientMethodDispatcherImpl_0_9 extends ClientMethodDispatcherImpl implements MethodDispatcher_0_9
 {
-    public ClientMethodDispatcherImpl_0_9(AMQStateManager stateManager)
+    public ClientMethodDispatcherImpl_0_9(AMQProtocolSession session)
     {
-        super(stateManager);
+        super(session);
     }
 
-
     public boolean dispatchBasicRecoverSyncOk(BasicRecoverSyncOkBody body, int channelId) throws AMQException
     {
         return false;
@@ -148,8 +147,7 @@
 
     public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException
     {
-        return false;  
+        return false;
     }
 
-
 }