You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2008/07/15 19:06:18 UTC

svn commit: r676978 [2/3] - in /incubator/qpid/trunk/qpid/java: ./ client/src/main/java/org/apache/qpid/client/ client/src/main/java/org/apache/qpid/client/failover/ client/src/main/java/org/apache/qpid/client/handler/ client/src/main/java/org/apache/q...

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java?rev=676978&r1=676977&r2=676978&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java Tue Jul 15 10:06:16 2008
@@ -24,13 +24,13 @@
 import org.apache.qpid.framing.amqp_8_0.MethodDispatcher_8_0;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
 
 public class ClientMethodDispatcherImpl_8_0 extends ClientMethodDispatcherImpl implements MethodDispatcher_8_0
 {
-    public ClientMethodDispatcherImpl_8_0(AMQStateManager stateManager)
+    public ClientMethodDispatcherImpl_8_0(AMQProtocolSession session)
     {
-        super(stateManager);
+        super(session);
     }
 
     public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java?rev=676978&r1=676977&r2=676978&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java Tue Jul 15 10:06:16 2008
@@ -25,13 +25,11 @@
 import org.apache.qpid.client.AMQAuthenticationException;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ConnectionCloseBody;
 import org.apache.qpid.framing.ConnectionCloseOkBody;
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.AMQMethodEvent;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,14 +46,13 @@
     }
 
     private ConnectionCloseMethodHandler()
-    { }
+    {
+    }
 
-    public void methodReceived(AMQStateManager stateManager, ConnectionCloseBody method, int channelId)
-                throws AMQException
+    public void methodReceived(AMQProtocolSession session, ConnectionCloseBody method, int channelId)
+            throws AMQException
     {
         _logger.info("ConnectionClose frame received");
-        final AMQProtocolSession session = stateManager.getProtocolSession();
-        
 
         // does it matter
         // stateManager.changeState(AMQState.CONNECTION_CLOSING);
@@ -63,6 +60,8 @@
         AMQConstant errorCode = AMQConstant.getConstant(method.getReplyCode());
         AMQShortString reason = method.getReplyText();
 
+        AMQException error = null;
+
         try
         {
 
@@ -75,35 +74,33 @@
             {
                 if (errorCode == AMQConstant.NOT_ALLOWED || (errorCode == AMQConstant.ACCESS_REFUSED))
                 {
-                    _logger.info("Error :" + errorCode +":"+ Thread.currentThread().getName());
-
-                    // todo ritchiem : Why do this here when it is going to be done in the finally block?
-                    session.closeProtocolSession();
+                    _logger.info("Error :" + errorCode + ":" + Thread.currentThread().getName());
 
-                    // todo this is a bit of a fudge (could be conssidered such as each new connection needs a new state manager or at least a fresh state.
-                    stateManager.changeState(AMQState.CONNECTION_NOT_STARTED);
-
-                    throw new AMQAuthenticationException(errorCode, reason == null ? null : reason.toString(), null);
+                    error = new AMQAuthenticationException(errorCode, reason == null ? null : reason.toString(), null);
                 }
                 else
                 {
                     _logger.info("Connection close received with error code " + errorCode);
 
-                    throw new AMQConnectionClosedException(errorCode, "Error: " + reason, null);
+                    error = new AMQConnectionClosedException(errorCode, "Error: " + reason, null);
                 }
             }
         }
         finally
         {
-            // this actually closes the connection in the case where it is not an error.
 
+            if (error != null)
+            {
+                session.notifyError(error);
+            }            
+
+            // Close the protocol Session, including any open TCP connections 
             session.closeProtocolSession();
 
-            // ritchiem: Doing this though will cause any waiting connection start to be released without being able to
-            // see what the cause was.
-            stateManager.changeState(AMQState.CONNECTION_CLOSED);
+            // Closing the session should not introduce a race condition as this thread will continue to propgate any
+            // exception in to the exceptionCaught method of the SessionHandler.
+            // Any sessionClosed event should occur after this.
         }
     }
 
-
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java?rev=676978&r1=676977&r2=676978&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java Tue Jul 15 10:06:16 2008
@@ -24,9 +24,7 @@
 import org.apache.qpid.framing.ConnectionOpenOkBody;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.protocol.AMQMethodEvent;
 
 public class ConnectionOpenOkMethodHandler implements StateAwareMethodListener<ConnectionOpenOkBody>
 {
@@ -41,10 +39,10 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, ConnectionOpenOkBody body, int channelId)
+    public void methodReceived(AMQProtocolSession session, ConnectionOpenOkBody body, int channelId)
                 throws AMQException            
     {
-        stateManager.changeState(AMQState.CONNECTION_OPEN);
+        session.getStateManager().changeState(AMQState.CONNECTION_OPEN);
     }
 
 

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java?rev=676978&r1=676977&r2=676978&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java Tue Jul 15 10:06:16 2008
@@ -22,10 +22,8 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
 import org.apache.qpid.framing.ConnectionRedirectBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,11 +44,10 @@
     private ConnectionRedirectMethodHandler()
     { }
 
-    public void methodReceived(AMQStateManager stateManager, ConnectionRedirectBody method, int channelId)
+    public void methodReceived(AMQProtocolSession session, ConnectionRedirectBody method, int channelId)
             throws AMQException
     {
         _logger.info("ConnectionRedirect frame received");
-        final AMQProtocolSession session = stateManager.getProtocolSession();         
 
         String host = method.getHost().toString();
         // the host is in the form hostname:port with the port being optional

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java?rev=676978&r1=676977&r2=676978&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java Tue Jul 15 10:06:16 2008
@@ -25,12 +25,9 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.ConnectionSecureBody;
 import org.apache.qpid.framing.ConnectionSecureOkBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
 
 public class ConnectionSecureMethodHandler implements StateAwareMethodListener<ConnectionSecureBody>
 {
@@ -41,10 +38,9 @@
         return _instance;
     }
 
-    public void methodReceived(AMQStateManager stateManager, ConnectionSecureBody body, int channelId)
+    public void methodReceived(AMQProtocolSession session, ConnectionSecureBody body, int channelId)
                 throws AMQException
     {
-        final AMQProtocolSession session = stateManager.getProtocolSession(); 
         SaslClient client = session.getSaslClient();
         if (client == null)
         {

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java?rev=676978&r1=676977&r2=676978&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java Tue Jul 15 10:06:16 2008
@@ -25,7 +25,6 @@
 import org.apache.qpid.client.security.AMQCallbackHandler;
 import org.apache.qpid.client.security.CallbackHandlerRegistry;
 import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
 import org.apache.qpid.common.ClientProperties;
 import org.apache.qpid.common.QpidProperties;
@@ -35,7 +34,6 @@
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.FieldTableFactory;
 import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.protocol.AMQMethodEvent;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,15 +60,12 @@
     private ConnectionStartMethodHandler()
     { }
 
-    public void methodReceived(AMQStateManager stateManager, ConnectionStartBody body, int channelId)
+    public void methodReceived(AMQProtocolSession session, ConnectionStartBody body, int channelId)
             throws AMQException
     {
         _log.debug("public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, "
             + "AMQMethodEvent evt): called");
 
-        final AMQProtocolSession session = stateManager.getProtocolSession();
-
-
         ProtocolVersion pv = new ProtocolVersion((byte) body.getVersionMajor(), (byte) body.getVersionMinor());
 
         // For the purposes of interop, we can make the client accept the broker's version string.
@@ -145,7 +140,7 @@
                     throw new AMQException(null, "No locales sent from server, passed: " + locales, null);
                 }
 
-                stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
+                session.getStateManager().changeState(AMQState.CONNECTION_NOT_TUNED);
                 FieldTable clientProperties = FieldTableFactory.newFieldTable();
 
                 clientProperties.setString(new AMQShortString(ClientProperties.instance.toString()),

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java?rev=676978&r1=676977&r2=676978&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java Tue Jul 15 10:06:16 2008
@@ -24,10 +24,8 @@
 import org.apache.qpid.client.ConnectionTuneParameters;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
 import org.apache.qpid.framing.*;
-import org.apache.qpid.protocol.AMQMethodEvent;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,11 +44,10 @@
     protected ConnectionTuneMethodHandler()
     { }
 
-    public void methodReceived(AMQStateManager stateManager, ConnectionTuneBody frame, int channelId)
+    public void methodReceived(AMQProtocolSession session, ConnectionTuneBody frame, int channelId)
                 throws AMQException
     {
         _logger.debug("ConnectionTune frame received");
-        final AMQProtocolSession session = stateManager.getProtocolSession();
         final MethodRegistry methodRegistry = session.getMethodRegistry();
 
 
@@ -65,7 +62,7 @@
         params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.getHeartbeat()));
         session.setConnectionTuneParameters(params);
 
-        stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
+        session.getStateManager().changeState(AMQState.CONNECTION_NOT_OPENED);
 
         ConnectionTuneOkBody tuneOkBody = methodRegistry.createConnectionTuneOkBody(params.getChannelMax(),
                                                                                     params.getFrameMax(),

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java?rev=676978&r1=676977&r2=676978&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java Tue Jul 15 10:06:16 2008
@@ -22,10 +22,8 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
 import org.apache.qpid.framing.ExchangeBoundOkBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,7 +44,7 @@
     private ExchangeBoundOkMethodHandler()
     { }
 
-    public void methodReceived(AMQStateManager stateManager, ExchangeBoundOkBody body, int channelId)
+    public void methodReceived(AMQProtocolSession session, ExchangeBoundOkBody body, int channelId)
             throws AMQException
     {
         if (_logger.isDebugEnabled())

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java?rev=676978&r1=676977&r2=676978&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java Tue Jul 15 10:06:16 2008
@@ -22,10 +22,8 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
 import org.apache.qpid.framing.QueueDeleteOkBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,7 +44,7 @@
     private QueueDeleteOkMethodHandler()
     { }
 
-    public void methodReceived(AMQStateManager stateManager, QueueDeleteOkBody body, int channelId)
+    public void methodReceived(AMQProtocolSession session, QueueDeleteOkBody body, int channelId)
             throws AMQException
     {        
         if (_logger.isDebugEnabled())

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=676978&r1=676977&r2=676978&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Tue Jul 15 10:06:16 2008
@@ -43,6 +43,7 @@
 import org.apache.qpid.client.failover.FailoverState;
 import org.apache.qpid.client.state.AMQState;
 import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateWaiter;
 import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
 import org.apache.qpid.codec.AMQCodecFactory;
 import org.apache.qpid.framing.*;
@@ -100,23 +101,22 @@
  * <p/><table id="crc"><caption>CRC Card</caption>
  * <tr><th> Responsibilities <th> Collaborations
  * <tr><td> Create the filter chain to filter this handlers events.
- *     <td> {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link SSLFilter}, {@link ReadWriteThreadModel}.
+ * <td> {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link SSLFilter}, {@link ReadWriteThreadModel}.
  *
  * <tr><td> Maintain fail-over state.
  * <tr><td>
  * </table>
  *
  * @todo Explain the system property: amqj.shared_read_write_pool. How does putting the protocol codec filter before the
- *       async write filter make it a shared pool? The pooling filter uses the same thread pool for reading and writing
- *       anyway, see {@link org.apache.qpid.pool.PoolingFilter}, docs for comments. Will putting the protocol codec
- *       filter before it mean not doing the read/write asynchronously but in the main filter thread?
- *
+ * async write filter make it a shared pool? The pooling filter uses the same thread pool for reading and writing
+ * anyway, see {@link org.apache.qpid.pool.PoolingFilter}, docs for comments. Will putting the protocol codec
+ * filter before it mean not doing the read/write asynchronously but in the main filter thread?
  * @todo Use a single handler instance, by shifting everything to do with the 'protocol session' state, including
- *       failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of
- *       AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could
- *       be merged, although there is sense in keeping the session model seperate. Will clarify things by having data
- *       held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so
- *       that lifecycles of the fields match lifecycles of their containing objects.
+ * failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of
+ * AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could
+ * be merged, although there is sense in keeping the session model seperate. Will clarify things by having data
+ * held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so
+ * that lifecycles of the fields match lifecycles of their containing objects.
  */
 public class AMQProtocolHandler extends IoHandlerAdapter
 {
@@ -136,7 +136,7 @@
     private AMQStateManager _stateManager = new AMQStateManager();
 
     /** Holds the method listeners, */
-    private final CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet();
+    private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>();
 
     /**
      * We create the failover handler when the session is created since it needs a reference to the IoSession in order
@@ -154,14 +154,12 @@
     /** Used to provide a condition to wait upon for operations that are required to wait for failover to complete. */
     private CountDownLatch _failoverLatch;
 
-
     /** The last failover exception that occured */
     private FailoverException _lastFailoverException;
 
     /** Defines the default timeout to use for synchronous protocol commands. */
     private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30;
 
-
     /**
      * Creates a new protocol handler, associated with the specified client connection instance.
      *
@@ -245,7 +243,10 @@
                 _logger.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage());
             }
         }
-        _protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager());
+        _protocolSession = new AMQProtocolSession(this, session, _connection);
+
+        _stateManager.setProtocolSession(_protocolSession);
+
         _protocolSession.init();
     }
 
@@ -263,7 +264,7 @@
      * @param session The MINA session.
      *
      * @todo Clarify: presumably exceptionCaught is called when the client is sending during a connection failure and
-     *       not otherwise? The above comment doesn't make that clear.
+     * not otherwise? The above comment doesn't make that clear.
      */
     public void sessionClosed(IoSession session)
     {
@@ -374,7 +375,7 @@
                                  "cause isn't AMQConnectionClosedException: " + cause, cause);
 
                     AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
-                    propagateExceptionToWaiters(amqe);
+                    propagateExceptionToAllWaiters(amqe);
                 }
                 _connection.exceptionReceived(cause);
 
@@ -395,7 +396,7 @@
             // we notify the state manager of the error in case we have any clients waiting on a state
             // change. Those "waiters" will be interrupted and can handle the exception
             AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
-            propagateExceptionToWaiters(amqe);
+            propagateExceptionToAllWaiters(amqe);
             _connection.exceptionReceived(cause);
         }
     }
@@ -405,11 +406,33 @@
      * These are for the state manager (waiting for a state change) or a frame listener (waiting for a particular type
      * of frame to arrive). When an error occurs we need to notify these waiters so that they can react appropriately.
      *
+     * This should be called only when the exception is fatal for the connection.
+     *
      * @param e the exception to propagate
+     *
+     * @see #propagateExceptionToFrameListeners
+     * @see #propagateExceptionToStateWaiters
      */
-    public void propagateExceptionToWaiters(Exception e)
+    public void propagateExceptionToAllWaiters(Exception e)
+    {
+        propagateExceptionToFrameListeners(e);
+        propagateExceptionToStateWaiters(e);
+    }
+
+    /**
+     * This caters for the case where we only need to propogate an exception to the the frame listeners to interupt any
+     * protocol level waits.
+     *
+     * This will would normally be used to notify all Frame Listeners that Failover is about to occur and they should
+     * stop waiting and relinquish the Failover lock {@see FailoverHandler}.
+     *
+     * Once the {@link FailoverHandler} has re-established the connection then the listeners will be able to re-attempt
+     * their protocol request and so listen again for the correct frame.
+     *
+     * @param e the exception to propagate
+     */
+    public void propagateExceptionToFrameListeners(Exception e)
     {
-        
         if (!_frameListeners.isEmpty())
         {
             final Iterator it = _frameListeners.iterator();
@@ -421,6 +444,22 @@
         }
     }
 
+    /**
+     * This caters for the case where we only need to propogate an exception to the the state manager to interupt any
+     * thing waiting for a state change.
+     *
+     * Currently (2008-07-15) the state manager is only used during 0-8/0-9 Connection establishement.
+     *
+     * Normally the state manager would not need to be notified without notifiying the frame listeners so in normal
+     * cases {@link #propagateExceptionToAllWaiters} would be the correct choice.
+     *
+     * @param e the exception to propagate
+     */
+    public void propagateExceptionToStateWaiters(Exception e)
+    {
+        getStateManager().error(e);
+    }
+
     public void notifyFailoverStarting()
     {
         // Set the last exception in the sync block to ensure the ordering with add.
@@ -431,7 +470,9 @@
             _lastFailoverException = new FailoverException("Failing over about to start");
         }
 
-        propagateExceptionToWaiters(_lastFailoverException);
+        //Only notify the Frame listeners that failover is going to occur as the State listeners shouldn't be
+        // interupted unless failover cannot restore the state.
+        propagateExceptionToFrameListeners(_lastFailoverException);
     }
 
     public void failoverInProgress()
@@ -443,7 +484,7 @@
 
     public void messageReceived(IoSession session, Object message) throws Exception
     {
-        if(message instanceof AMQFrame)
+        if (message instanceof AMQFrame)
         {
             final boolean debug = _logger.isDebugEnabled();
             final long msgNumber = ++_messageReceivedCount;
@@ -459,7 +500,7 @@
 
             HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
 
-            bodyFrame.handle(frame.getChannel(),_protocolSession);
+            bodyFrame.handle(frame.getChannel(), _protocolSession);
 
             _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
         }
@@ -508,20 +549,12 @@
             if (!wasAnyoneInterested)
             {
                 throw new AMQException(null, "AMQMethodEvent " + evt + " was not processed by any listener.  Listeners:"
-                                       + _frameListeners, null);
+                                             + _frameListeners, null);
             }
         }
         catch (AMQException e)
-        {            
-            if (!_frameListeners.isEmpty())
-            {
-                Iterator it = _frameListeners.iterator();
-                while (it.hasNext())
-                {
-                    final AMQMethodListener listener = (AMQMethodListener) it.next();
-                    listener.error(e);
-                }
-            }
+        {
+            propagateExceptionToFrameListeners(e);
 
             exceptionCaught(session, e);
         }
@@ -548,28 +581,11 @@
         }
     }
 
-    /*
-      public void addFrameListener(AMQMethodListener listener)
-      {
-          _frameListeners.add(listener);
-      }
-
-      public void removeFrameListener(AMQMethodListener listener)
-      {
-          _frameListeners.remove(listener);
-      }
-     */
-    public void attainState(AMQState s) throws Exception
-    {
-        getStateManager().attainState(s);
-    }
-
-    public AMQState attainState(Set<AMQState> states) throws AMQException
+    public StateWaiter createWaiter(Set<AMQState> states) throws AMQException
     {
-        return getStateManager().attainState(states);
+        return getStateManager().createWaiter(states);
     }
 
-
     /**
      * Convenience method that writes a frame to the protocol session. Equivalent to calling
      * getProtocolSession().write().
@@ -617,14 +633,12 @@
                 {
                     throw _lastFailoverException;
                 }
-                
+
                 _frameListeners.add(listener);
             }
             _protocolSession.writeFrame(frame);
 
-            AMQMethodEvent e = listener.blockForFrame(timeout);
-
-            return e;
+            return listener.blockForFrame(timeout);
             // When control resumes before this line, a reply will have been received
             // that matches the criteria defined in the blocking listener
         }
@@ -669,8 +683,7 @@
         getStateManager().changeState(AMQState.CONNECTION_CLOSING);
 
         ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
-                new AMQShortString("JMS client is closing the connection."),0,0);
-
+                                                                                                  new AMQShortString("JMS client is closing the connection."), 0, 0);
 
         final AMQFrame frame = body.generateFrame(0);
 
@@ -745,10 +758,6 @@
     public void setStateManager(AMQStateManager stateManager)
     {
         _stateManager = stateManager;
-        if (_protocolSession != null)
-        {
-            _protocolSession.setStateManager(stateManager);
-        }
     }
 
     public AMQProtocolSession getProtocolSession()
@@ -778,7 +787,7 @@
 
     public MethodRegistry getMethodRegistry()
     {
-        return getStateManager().getMethodRegistry();
+        return _protocolSession.getMethodRegistry();
     }
 
     public ProtocolVersion getProtocolVersion()

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=676978&r1=676977&r2=676978&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Tue Jul 15 10:06:16 2008
@@ -30,7 +30,6 @@
 
 import javax.jms.JMSException;
 import javax.security.sasl.SaslClient;
-import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -38,10 +37,10 @@
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.ConnectionTuneParameters;
-import org.apache.qpid.client.message.ReturnMessage;
 import org.apache.qpid.client.message.UnprocessedMessage;
 import org.apache.qpid.client.message.UnprocessedMessage_0_8;
 import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.AMQState;
 import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
@@ -67,8 +66,6 @@
 
     protected final IoSession _minaProtocolSession;
 
-    private AMQStateManager _stateManager;
-
     protected WriteFuture _lastWriteFuture;
 
     /**
@@ -86,7 +83,7 @@
      * Maps from a channel id to an unprocessed message. This is used to tie together the JmsDeliverBody (which arrives
      * first) with the subsequent content header and content bodies.
      */
-    private final ConcurrentMap<Integer,UnprocessedMessage> _channelId2UnprocessedMsgMap = new ConcurrentHashMap<Integer,UnprocessedMessage>();
+    private final ConcurrentMap<Integer, UnprocessedMessage> _channelId2UnprocessedMsgMap = new ConcurrentHashMap<Integer, UnprocessedMessage>();
     private final UnprocessedMessage[] _channelId2UnprocessedMsgArray = new UnprocessedMessage[16];
 
     /** Counter to ensure unique queue names */
@@ -97,26 +94,16 @@
 //    private VersionSpecificRegistry _registry =
 //        MainRegistry.getVersionSpecificRegistry(ProtocolVersion.getLatestSupportedVersion());
 
-
     private MethodRegistry _methodRegistry =
             MethodRegistry.getMethodRegistry(ProtocolVersion.getLatestSupportedVersion());
 
-
     private MethodDispatcher _methodDispatcher;
 
-
     private final AMQConnection _connection;
     private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
 
     public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
     {
-        this(protocolHandler, protocolSession, connection, new AMQStateManager());
-
-    }
-
-    public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection,
-        AMQStateManager stateManager)
-    {
         _protocolHandler = protocolHandler;
         _minaProtocolSession = protocolSession;
         _minaProtocolSession.setAttachment(this);
@@ -124,11 +111,9 @@
         _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
         // fixme - real value needed
         _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
-        _stateManager = stateManager;
-        _stateManager.setProtocolSession(this);
         _protocolVersion = connection.getProtocolVersion();
         _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
-                                                                 stateManager);
+                                                                           this);
         _connection = connection;
 
     }
@@ -161,14 +146,7 @@
 
     public AMQStateManager getStateManager()
     {
-        return _stateManager;
-    }
-
-    public void setStateManager(AMQStateManager stateManager)
-    {
-        _stateManager = stateManager;
-        _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(_protocolVersion,
-                                                                 stateManager);         
+        return _protocolHandler.getStateManager();
     }
 
     public String getVirtualHost()
@@ -238,9 +216,9 @@
     public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException
     {
         final int channelId = message.getChannelId();
-        if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+        if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
         {
-            _channelId2UnprocessedMsgArray[channelId] = message;    
+            _channelId2UnprocessedMsgArray[channelId] = message;
         }
         else
         {
@@ -251,17 +229,16 @@
     public void contentHeaderReceived(int channelId, ContentHeaderBody contentHeader) throws AMQException
     {
         final UnprocessedMessage msg = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0 ? _channelId2UnprocessedMsgArray[channelId]
-                                                               : _channelId2UnprocessedMsgMap.get(channelId);
-
+                                       : _channelId2UnprocessedMsgMap.get(channelId);
 
         if (msg == null)
         {
-            throw new AMQException(null, "Error: received content header without having received a BasicDeliver frame first", null);
+            throw new AMQException(null, "Error: received content header without having received a BasicDeliver frame first on session:" + this, null);
         }
 
         if (msg.getContentHeader() != null)
         {
-            throw new AMQException(null, "Error: received duplicate content header or did not receive correct number of content body frames", null);
+            throw new AMQException(null, "Error: received duplicate content header or did not receive correct number of content body frames on session:" + this, null);
         }
 
         msg.setContentHeader(contentHeader);
@@ -275,7 +252,7 @@
     {
         UnprocessedMessage_0_8 msg;
         final boolean fastAccess = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0;
-        if(fastAccess)
+        if (fastAccess)
         {
             msg = (UnprocessedMessage_0_8) _channelId2UnprocessedMsgArray[channelId];
         }
@@ -291,7 +268,7 @@
 
         if (msg.getContentHeader() == null)
         {
-            if(fastAccess)
+            if (fastAccess)
             {
                 _channelId2UnprocessedMsgArray[channelId] = null;
             }
@@ -333,7 +310,7 @@
     {
         AMQSession session = getSession(channelId);
         session.messageReceived(msg);
-        if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+        if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
         {
             _channelId2UnprocessedMsgArray[channelId] = null;
         }
@@ -431,12 +408,12 @@
         return (AMQConnection) _minaProtocolSession.getAttribute(AMQ_CONNECTION);
     }
 
-    public void closeProtocolSession()
+    public void closeProtocolSession() throws AMQException
     {
         closeProtocolSession(true);
     }
 
-    public void closeProtocolSession(boolean waitLast)
+    public void closeProtocolSession(boolean waitLast) throws AMQException
     {
         _logger.debug("Waiting for last write to join.");
         if (waitLast && (_lastWriteFuture != null))
@@ -446,6 +423,14 @@
 
         _logger.debug("Closing protocol session");
         final CloseFuture future = _minaProtocolSession.close();
+
+        // There is no recovery we can do if the join on the close failes so simply mark the connection CLOSED
+        // then wait for the connection to close.
+        // ritchiem: Could this release BlockingWaiters to early? The close has been done as much as possible so any
+        // error now shouldn't matter.
+
+        _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED);
+
         future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
     }
 
@@ -489,9 +474,9 @@
     {
         _protocolVersion = pv;
         _methodRegistry = MethodRegistry.getMethodRegistry(pv);
-        _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(pv, _stateManager);
+        _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(pv, this);
 
-      //  _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor);
+        //  _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor);
     }
 
     public byte getProtocolMinorVersion()
@@ -524,12 +509,12 @@
         return _methodDispatcher;
     }
 
-
     public void setTicket(int ticket, int channelId)
     {
         final AMQSession session = getSession(channelId);
         session.setTicket(ticket);
     }
+
     public void setMethodDispatcher(MethodDispatcher methodDispatcher)
     {
         _methodDispatcher = methodDispatcher;
@@ -545,4 +530,9 @@
     {
         _protocolHandler.methodBodyReceived(channel, amqMethodBody, _minaProtocolSession);
     }
+
+    public void notifyError(Exception error)
+    {
+        _protocolHandler.propagateExceptionToAllWaiters(error);
+    }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java?rev=676978&r1=676977&r2=676978&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java Tue Jul 15 10:06:16 2008
@@ -20,9 +20,14 @@
  */
 package org.apache.qpid.client.protocol;
 
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQTimeoutException;
 import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.util.BlockingWaiter;
 import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
@@ -54,38 +59,17 @@
  * </table>
  *
  * @todo Might be neater if this method listener simply wrapped another that provided the method handling using a
- *       methodRecevied method. The processMethod takes an additional channelId, however none of the implementations
- *       seem to use it. So wrapping the listeners is possible.
- *
- * @todo What is to stop a blocking method listener, receiving a second method whilst it is registered as a listener,
- *       overwriting the first one before the caller of the block method has had a chance to examine it? If one-shot
- *       behaviour is to be intended it should be enforced, perhaps by always returning false once the blocked for
- *       method has been received.
- *
- * @todo Interuption is caught but not handled. This could be allowed to fall through. This might actually be usefull
- *       for fail-over where a thread is blocking when failure happens, it could be interrupted to abandon or retry
- *       when this happens. At the very least, restore the interrupted status flag.
- *
+ * methodRecevied method. The processMethod takes an additional channelId, however none of the implementations
+ * seem to use it. So wrapping the listeners is possible.
  * @todo If the retrotranslator can handle it, could use a SynchronousQueue to implement this rendezvous. Need to
- *       check that SynchronousQueue has a non-blocking put method available.
+ * check that SynchronousQueue has a non-blocking put method available.
  */
-public abstract class BlockingMethodFrameListener implements AMQMethodListener
+public abstract class BlockingMethodFrameListener extends BlockingWaiter<AMQMethodEvent> implements AMQMethodListener
 {
-    /** This flag is used to indicate that the blocked for method has been received. */
-    private volatile boolean _ready = false;
-
-    /** Used to protect the shared event and ready flag between the producer and consumer. */
-    private final Object _lock = new Object();
-
-    /** Used to hold the most recent exception that is passed to the {@link #error(Exception)} method. */
-    private volatile Exception _error;
 
     /** Holds the channel id for the channel upon which this listener is waiting for a response. */
     protected int _channelId;
 
-    /** Holds the incoming method. */
-    protected AMQMethodEvent _doneEvt = null;
-
     /**
      * Creates a new method listener, that filters incoming method to just those that match the specified channel id.
      *
@@ -104,7 +88,14 @@
      *
      * @return <tt>true</tt> if the method was handled, <tt>false</tt> otherwise.
      */
-    public abstract boolean processMethod(int channelId, AMQMethodBody frame); // throws AMQException;
+    public abstract boolean processMethod(int channelId, AMQMethodBody frame);
+
+    public boolean process(AMQMethodEvent evt)
+    {
+        AMQMethodBody method = evt.getMethod();
+
+        return (evt.getChannelId() == _channelId) && processMethod(evt.getChannelId(), method);
+    }
 
     /**
      * Informs this listener that an AMQP method has been received.
@@ -113,37 +104,9 @@
      *
      * @return <tt>true</tt> if this listener has handled the method, <tt>false</tt> otherwise.
      */
-    public boolean methodReceived(AMQMethodEvent evt) // throws AMQException
+    public boolean methodReceived(AMQMethodEvent evt)
     {
-        AMQMethodBody method = evt.getMethod();
-
-        /*try
-        {*/
-        boolean ready = (evt.getChannelId() == _channelId) && processMethod(evt.getChannelId(), method);
-
-        if (ready)
-        {
-            // we only update the flag from inside the synchronized block
-            // so that the blockForFrame method cannot "miss" an update - it
-            // will only ever read the flag from within the synchronized block
-            synchronized (_lock)
-            {
-                _doneEvt = evt;
-                _ready = ready;
-                _lock.notify();
-            }
-        }
-
-        return ready;
-
-        /*}
-        catch (AMQException e)
-        {
-            error(e);
-            // we rethrow the error here, and the code in the frame dispatcher will go round
-            // each listener informing them that an exception has been thrown
-            throw e;
-        }*/
+        return received(evt);
     }
 
     /**
@@ -159,75 +122,15 @@
      */
     public AMQMethodEvent blockForFrame(long timeout) throws AMQException, FailoverException
     {
-        synchronized (_lock)
+        try
         {
-            while (!_ready)
-            {
-                try
-                {
-                    if (timeout == -1)
-                    {
-                        _lock.wait();
-                    }
-                    else
-                    {
-
-                        _lock.wait(timeout);
-                        if (!_ready)
-                        {
-                            _error = new AMQTimeoutException("Server did not respond in a timely fashion", null);
-                            _ready = true;
-                        }
-                    }
-                }
-                catch (InterruptedException e)
-                {
-                    // IGNORE    -- //fixme this isn't ideal as being interrupted isn't equivellant to sucess
-                    // if (!_ready && timeout != -1)
-                    // {
-                    // _error = new AMQException("Server did not respond timely");
-                    // _ready = true;
-                    // }
-                }
-            }
+            return (AMQMethodEvent) block(timeout);
         }
-
-        if (_error != null)
+        finally
         {
-            if (_error instanceof AMQException)
-            {
-                throw (AMQException) _error;
-            }
-            else if (_error instanceof FailoverException)
-            {
-                // This should ensure that FailoverException is not wrapped and can be caught.
-                throw (FailoverException) _error; // needed to expose FailoverException.
-            }
-            else
-            {
-                throw new AMQException(null, "Woken up due to " + _error.getClass(), _error);
-            }
+            //Prevent any more errors being notified to this waiter.
+            close();
         }
-
-        return _doneEvt;
     }
 
-    /**
-     * This is a callback, called by the MINA dispatcher thread only. It is also called from within this
-     * class to avoid code repetition but again is only called by the MINA dispatcher thread.
-     *
-     * @param e
-     */
-    public void error(Exception e)
-    {
-        // set the error so that the thread that is blocking (against blockForFrame())
-        // can pick up the exception and rethrow to the caller
-        _error = e;
-
-        synchronized (_lock)
-        {
-            _ready = true;
-            _lock.notify();
-        }
-    }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java?rev=676978&r1=676977&r2=676978&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java Tue Jul 15 10:06:16 2008
@@ -28,15 +28,30 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Iterator;
 import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 /**
- * The state manager is responsible for managing the state of the protocol session. <p/> For each AMQProtocolHandler
- * there is a separate state manager.
+ * The state manager is responsible for managing the state of the protocol session. <p/>
+ * For each {@link org.apache.qpid.client.protocol.AMQProtocolHandler} there is a separate state manager.
+ *
+ * The AMQStateManager is now attached to the {@link org.apache.qpid.client.protocol.AMQProtocolHandler} and that is the sole point of reference so that
+ * As the {@link AMQProtocolSession} changes due to failover the AMQStateManager need not be copied around.
+ *
+ * The StateManager works by any component can wait for a state change to occur by using the following sequence.
+ *
+ * <li>StateWaiter waiter = stateManager.createWaiter(Set<AMQState> states);
+ * <li> // Perform action that will cause state change
+ * <li>waiter.await();
+ *
+ * The two step process is required as there is an inherit race condition between starting a process that will cause
+ * the state to change and then attempting to wait for that change. The interest in the change must be first set up so
+ * that any asynchrous errors that occur can be delivered to the correct waiters.
+ * 
+ *
  */
-public class AMQStateManager 
+public class AMQStateManager implements AMQMethodListener
 {
     private static final Logger _logger = LoggerFactory.getLogger(AMQStateManager.class);
 
@@ -45,16 +60,13 @@
     /** The current state */
     private AMQState _currentState;
 
-
-    /**
-     * Maps from an AMQState instance to a Map from Class to StateTransitionHandler. The class must be a subclass of
-     * AMQFrame.
-     */
-
-
     private final Object _stateLock = new Object();
+
     private static final long MAXIMUM_STATE_WAIT_TIME = Long.parseLong(System.getProperty("amqj.MaximumStateWait", "30000"));
 
+    protected final List<StateWaiter> _waiters = new CopyOnWriteArrayList<StateWaiter>();
+    private Exception _lastException;
+
     public AMQStateManager()
     {
         this(null);
@@ -62,18 +74,15 @@
 
     public AMQStateManager(AMQProtocolSession protocolSession)
     {
-        this(AMQState.CONNECTION_NOT_STARTED, true, protocolSession);
+        this(AMQState.CONNECTION_NOT_STARTED, protocolSession);
     }
 
-    protected AMQStateManager(AMQState state, boolean register, AMQProtocolSession protocolSession)
+    protected AMQStateManager(AMQState state, AMQProtocolSession protocolSession)
     {
         _protocolSession = protocolSession;
         _currentState = state;
-
     }
 
-
-
     public AMQState getCurrentState()
     {
         return _currentState;
@@ -86,117 +95,101 @@
         synchronized (_stateLock)
         {
             _currentState = newState;
-            _stateLock.notifyAll();
+
+            _logger.debug("Notififying State change to " + _waiters.size() + " : " + _waiters);
+
+            for (StateWaiter waiter : _waiters)
+            {
+                waiter.received(newState);
+            }
         }
     }
 
-
     public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException
     {
-
         B method = evt.getMethod();
-        
+
         //    StateAwareMethodListener handler = findStateTransitionHandler(_currentState, evt.getMethod());
         method.execute(_protocolSession.getMethodDispatcher(), evt.getChannelId());
         return true;
     }
 
+    /**
+     * Setting of the ProtocolSession will be required when Failover has been successfuly compeleted.
+     *
+     * The new {@link AMQProtocolSession} that has been re-established needs to be provided as that is now the
+     * connection to the network.
+     *
+     * @param session The new protocol session
+     */
+    public void setProtocolSession(AMQProtocolSession session)
+    {
+        _logger.error("Setting ProtocolSession:" + session);
+        _protocolSession = session;
+    }
 
-    public void attainState(final AMQState s) throws Exception
+    /**
+     * Propogate error to waiters
+    *
+     * @param error The error to propogate.
+     */
+    public void error(Exception error)
     {
-        synchronized (_stateLock)
+        if (_waiters.size() == 0)
         {
-            final long waitUntilTime = System.currentTimeMillis() + MAXIMUM_STATE_WAIT_TIME;
-            long waitTime = MAXIMUM_STATE_WAIT_TIME;
-
-            while ((_currentState != s) && (waitTime > 0))
-            {
-                try
-                {
-                    _stateLock.wait(MAXIMUM_STATE_WAIT_TIME);
-                }
-                catch (InterruptedException e)
-                {
-                    _logger.warn("Thread interrupted");
-                    if (_protocolSession.getAMQConnection().getLastException() != null)
-                    {
-                        throw _protocolSession.getAMQConnection().getLastException();
-                    }
-
-                }
-
-                if (_currentState != s)
-                {
-                    waitTime = waitUntilTime - System.currentTimeMillis();
-                }
-            }
-
-            if (_currentState != s)
-            {
-                _logger.warn("State not achieved within permitted time.  Current state " + _currentState
-                             + ", desired state: " + s);
-                throw new AMQException(null, "State not achieved within permitted time.  Current state " + _currentState
-                                             + ", desired state: " + s, null);
-            }
+            _logger.error("No Waiters for error saving as last error:" + error.getMessage());
+            _lastException = error;
+        }
+        for (StateWaiter waiter : _waiters)
+        {
+            _logger.error("Notifying Waiters(" + _waiters + ") for error:" + error.getMessage());
+            waiter.error(error);
         }
-
-        // at this point the state will have changed.
     }
 
-    public AMQProtocolSession getProtocolSession()
+    /**
+     * This provides a single place that the maximum time for state change to occur can be accessed.
+     * It is currently set via System property amqj.MaximumStateWait
+     *
+     * @return long Milliseconds value for a timeout
+     */
+    public long getWaitTimeout()
     {
-        return _protocolSession;
+        return MAXIMUM_STATE_WAIT_TIME;
     }
 
-    public void setProtocolSession(AMQProtocolSession session)
+    /**
+     * Create and add a new waiter to the notifcation list.
+     * @param states The waiter will attempt to wait for one of these desired set states to be achived.
+     * @return the created StateWaiter.
+     */
+    public StateWaiter createWaiter(Set<AMQState> states)
     {
-        _protocolSession = session;
-    }
+        final StateWaiter waiter;
+        synchronized (_stateLock)
+        {
+            waiter = new StateWaiter(this, _currentState, states);
 
-    public MethodRegistry getMethodRegistry()
-    {
-        return getProtocolSession().getMethodRegistry();
+            _waiters.add(waiter);
+        }
+
+        return waiter;
     }
 
-    public AMQState attainState(Set<AMQState> stateSet) throws AMQException
+    /**
+     * Remove the waiter from the notification list.
+     * @param waiter The waiter to remove.
+     */
+    public void removeWaiter(StateWaiter waiter)
     {
         synchronized (_stateLock)
         {
-            final long waitUntilTime = System.currentTimeMillis() + MAXIMUM_STATE_WAIT_TIME;
-            long waitTime = MAXIMUM_STATE_WAIT_TIME;
-
-            while (!stateSet.contains(_currentState) && (waitTime > 0))
-            {
-                try
-                {
-                    _stateLock.wait(MAXIMUM_STATE_WAIT_TIME);
-                }
-                catch (InterruptedException e)
-                {
-                    _logger.warn("Thread interrupted");
-                    if (_protocolSession.getAMQConnection().getLastException() != null)
-                    {
-                        throw new AMQException(null, "Could not attain state due to exception",
-                                _protocolSession.getAMQConnection().getLastException());
-                    }
-                }
-
-                if (!stateSet.contains(_currentState))
-                {
-                    waitTime = waitUntilTime - System.currentTimeMillis();
-                }
-            }
-
-            if (!stateSet.contains(_currentState))
-            {
-                _logger.warn("State not achieved within permitted time.  Current state " + _currentState
-                             + ", desired state: " + stateSet);
-                throw new AMQException(null, "State not achieved within permitted time.  Current state " + _currentState
-                                       + ", desired state: " + stateSet, null);
-            }
-            return _currentState;
+            _waiters.remove(waiter);
         }
+    }
 
-
+    public Exception getLastException()
+    {
+        return _lastException;
     }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java?rev=676978&r1=676977&r2=676978&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java Tue Jul 15 10:06:16 2008
@@ -33,6 +33,6 @@
 public interface StateAwareMethodListener<B extends AMQMethodBody>
 {
 
-    void methodReceived(AMQStateManager stateManager, B body, int channelId) throws AMQException;
+    void methodReceived(AMQProtocolSession session, B body, int channelId) throws AMQException;
 
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java?rev=676978&r1=676977&r2=676978&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java Tue Jul 15 10:06:16 2008
@@ -20,103 +20,110 @@
  */
 package org.apache.qpid.client.state;
 
+import org.apache.qpid.client.util.BlockingWaiter;
+import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.AMQException;
-
-import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+
+import java.util.Set;
 
 /**
- * Waits for a particular state to be reached.
+ * This is an implementation of the {@link BlockingWaiter} to provide error handing and a waiting mechanism for state
+ * changes.
+ *
+ * On construction the current state and a set of States to await for is provided.
+ *
+ * When await() is called the state at constuction is compared against the awaitStates. If the state at construction is
+ * a desired state then await() returns immediately.
+ *
+ * Otherwise it will block for the set timeout for a desired state to be achieved.
+ *
+ * The state changes are notified via the {@link #process} method.
+ *
+ * Any notified error is handled by the BlockingWaiter and thrown from the {@link #block} method.
+ *
  */
-public class StateWaiter implements StateListener
+public class StateWaiter extends BlockingWaiter<AMQState>
 {
     private static final Logger _logger = LoggerFactory.getLogger(StateWaiter.class);
 
-    private final AMQState _state;
-
-    private volatile boolean _newStateAchieved;
-
-    private volatile Throwable _throwable;
-
-    private final Object _monitor = new Object();
-    private static final long TIME_OUT = 1000 * 60 * 2;
-
-    public StateWaiter(AMQState state)
+    Set<AMQState> _awaitStates;
+    private AMQState _startState;
+    private AMQStateManager _stateManager;
+
+    /**
+     *
+     * @param stateManager The StateManager
+     * @param currentState
+     * @param awaitStates
+     */
+    public StateWaiter(AMQStateManager stateManager, AMQState currentState, Set<AMQState> awaitStates)
     {
-        _state = state;
+        _logger.info("New StateWaiter :" + currentState + ":" + awaitStates);
+        _stateManager = stateManager;
+        _awaitStates = awaitStates;
+        _startState = currentState;
     }
 
-    public void waituntilStateHasChanged() throws AMQException
+    /**
+     * When the state is changed this StateWaiter is notified to process the change.
+     *
+     * @param state The new state that has been achieved.
+     * @return
+     */
+    public boolean process(AMQState state)
     {
-        synchronized (_monitor)
-        {
-            //
-            // The guard is required in case we are woken up by a spurious
-            // notify().
-            //
-            while (!_newStateAchieved && (_throwable == null))
-            {
-                try
-                {
-                    _logger.debug("State " + _state + " not achieved so waiting...");
-                    _monitor.wait(TIME_OUT);
-                    // fixme this won't cause the timeout to exit the loop. need to set _throwable
-                }
-                catch (InterruptedException e)
-                {
-                    _logger.debug("Interrupted exception caught while waiting: " + e, e);
-                }
-            }
-        }
+        return _awaitStates.contains(state);
+    }
 
-        if (_throwable != null)
-        {
-            _logger.debug("Throwable reached state waiter: " + _throwable);
-            if (_throwable instanceof AMQException)
-            {
-                throw (AMQException) _throwable;
-            }
-            else
-            {
-                throw new AMQException(null, "Error: " + _throwable, _throwable); // FIXME: this will wrap FailoverException in throwable which will prevent it being caught.
-            }
-        }
+    /**
+     * Await for the requried State to be achieved within the default timeout.
+     * @return The achieved state that was requested.
+     * @throws AMQException The exception that prevented the required state from being achived.
+     */
+    public AMQState await() throws AMQException
+    {
+        return await(_stateManager.getWaitTimeout());
     }
 
-    public void stateChanged(AMQState oldState, AMQState newState)
+    /**
+     * Await for the requried State to be achieved.
+     *
+     * <b>It is the responsibility of this class to remove the waiter from the StateManager
+     *
+     * @param timeout The time in milliseconds to wait for any of the states to be achived.
+     * @return The achieved state that was requested.
+     * @throws AMQException The exception that prevented the required state from being achived.
+     */
+    public AMQState await(long timeout) throws AMQException
     {
-        synchronized (_monitor)
+        try
         {
-            if (_logger.isDebugEnabled())
+            if (process(_startState))
             {
-                _logger.debug("stateChanged called changing from :" + oldState + " to :" + newState);
+                return _startState;
             }
 
-            if (_state == newState)
+            try
             {
-                _newStateAchieved = true;
-
-                if (_logger.isDebugEnabled())
-                {
-                    _logger.debug("New state reached so notifying monitor");
-                }
+                return (AMQState) block(timeout);
+            }
+            catch (FailoverException e)
+            {
+                _logger.error("Failover occured whilst waiting for states:" + _awaitStates);
 
-                _monitor.notifyAll();
+                e.printStackTrace();
+                return null;
             }
         }
-    }
-
-    public void error(Throwable t)
-    {
-        synchronized (_monitor)
+        finally
         {
-            if (_logger.isDebugEnabled())
-            {
-                _logger.debug("exceptionThrown called");
-            }
+            //Prevent any more errors being notified to this waiter.
+            close();
 
-            _throwable = t;
-            _monitor.notifyAll();
+            //Remove the waiter from the notifcation list in the statee manager
+            _stateManager.removeWaiter(this);
         }
     }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java?rev=676978&r1=676977&r2=676978&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java Tue Jul 15 10:06:16 2008
@@ -20,7 +20,7 @@
  */
 package org.apache.qpid.client.state.listener;
 
-import org.apache.qpid.AMQException;
+
 import org.apache.qpid.client.protocol.BlockingMethodFrameListener;
 import org.apache.qpid.framing.AMQMethodBody;
 
@@ -34,7 +34,7 @@
         _expectedClass = expectedClass;
     }
 
-    public boolean processMethod(int channelId, AMQMethodBody frame) //throws AMQException
+    public boolean processMethod(int channelId, AMQMethodBody frame)
     {
         return _expectedClass.isInstance(frame);
     }

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java?rev=676978&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java Tue Jul 15 10:06:16 2008
@@ -0,0 +1,348 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.util;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQTimeoutException;
+import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodListener;
+
+/**
+ * BlockingWaiter is a 'rendezvous' which delegates handling of
+ * incoming Objects to a listener implemented as a sub-class of this and hands off the process or
+ * error to a consumer. The producer of the event does not have to wait for the consumer to take the event, so this
+ * differs from a 'rendezvous' in that sense.
+ *
+ * <p/>BlockingWaiters are used to coordinate when waiting for an an event that expect a response.
+ * They are always used in a 'one-shot' manner, that is, to recieve just one response. Usually the caller has to register
+ * them as method listeners with an event dispatcher and remember to de-register them (in a finally block) once they
+ * have been completed.
+ *
+ * <p/>The {@link #process} must return <tt>true</tt> on any incoming method that it handles. This indicates to
+ * this listeners that the object just processed ends the waiting process.
+ *
+ * <p/>Errors from the producer are rethrown to the consumer.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations </td>
+ * <tr><td> Accept generic objects as events for processing via {@link #process}. <td>
+ * <tr><td> Delegate handling and undserstanding of the object to a concrete implementation. <td>
+ * <tr><td> Block until {@link #process} determines that waiting is no longer required <td>
+ * <tr><td> Propagate the most recent exception to the consumer.<td>
+ * </table>
+ *
+ * @todo Interuption is caught but not handled. This could be allowed to fall through. This might actually be usefull
+ * for fail-over where a thread is blocking when failure happens, it could be interrupted to abandon or retry
+ * when this happens. At the very least, restore the interrupted status flag.
+ * @todo If the retrotranslator can handle it, could use a SynchronousQueue to implement this rendezvous. Need to
+ * check that SynchronousQueue has a non-blocking put method available.
+ */
+public abstract class BlockingWaiter<T>
+{
+    /** This flag is used to indicate that the blocked for method has been received. */
+    private volatile boolean _ready = false;
+
+    /** This flag is used to indicate that the received error has been processed. */
+    private volatile boolean _errorAck = false;
+
+    /** Used to protect the shared event and ready flag between the producer and consumer. */
+    private final ReentrantLock _lock = new ReentrantLock();
+
+    /** Used to signal that a method has been received */
+    private final Condition _receivedCondition = _lock.newCondition();
+
+    /** Used to signal that a error has been processed */
+    private final Condition _errorConditionAck = _lock.newCondition();
+
+    /** Used to hold the most recent exception that is passed to the {@link #error(Exception)} method. */
+    private volatile Exception _error;
+
+    /** Holds the incomming Object. */
+    protected Object _doneObject = null;
+    private AtomicBoolean _waiting = new AtomicBoolean(false);
+    private boolean _closed = false;
+
+    /**
+     * Delegates processing of the incomming object to the handler.
+     *
+     * @param object The object to process.
+     *
+     * @return <tt>true</tt> if the waiting is complete, <tt>false</tt> if waiting should continue.
+     */
+    public abstract boolean process(T object);
+
+    /**
+     * An Object has been received and should be processed to see if our wait condition has been reached.
+     *
+     * @param object The object received.
+     *
+     * @return <tt>true</tt> if the waiting is complete, <tt>false</tt> if waiting should continue.
+     */
+    public boolean received(T object)
+    {
+
+        boolean ready = process(object);
+
+        if (ready)
+        {
+            // we only update the flag from inside the synchronized block
+            // so that the blockForFrame method cannot "miss" an update - it
+            // will only ever read the flag from within the synchronized block
+            _lock.lock();
+            try
+            {
+                _doneObject = object;
+                _ready = ready;
+                _receivedCondition.signal();
+            }
+            finally
+            {
+                _lock.unlock();
+            }
+        }
+
+        return ready;
+    }
+
+    /**
+     * Blocks until an object is received that is handled by process, or the specified timeout
+     * has passed.
+     *
+     * Once closed any attempt to wait will throw an exception.
+     *
+     * @param timeout The timeout in milliseconds.
+     *
+     * @return The object that resolved the blocking.
+     *
+     * @throws AMQException
+     * @throws FailoverException
+     */
+    public Object block(long timeout) throws AMQException, FailoverException
+    {
+        long nanoTimeout = TimeUnit.MILLISECONDS.toNanos(timeout);
+
+        _lock.lock();
+
+        try
+        {
+            if (_closed)
+            {
+                throw throwClosedException();
+            }
+
+            if (_error == null)
+            {
+                _waiting.set(true);
+
+                while (!_ready)
+                {
+                    try
+                    {
+                        if (timeout == -1)
+                        {
+                            _receivedCondition.await();
+                        }
+                        else
+                        {
+                            nanoTimeout = _receivedCondition.awaitNanos(nanoTimeout);
+
+                            if (nanoTimeout <= 0 && !_ready && _error == null)
+                            {
+                                _error = new AMQTimeoutException("Server did not respond in a timely fashion", null);
+                                _ready = true;
+                            }
+                        }
+                    }
+                    catch (InterruptedException e)
+                    {
+                        System.err.println(e.getMessage());
+                        // IGNORE    -- //fixme this isn't ideal as being interrupted isn't equivellant to sucess
+                        // if (!_ready && timeout != -1)
+                        // {
+                        // _error = new AMQException("Server did not respond timely");
+                        // _ready = true;
+                        // }
+                    }
+                }
+            }
+
+            if (_error != null)
+            {
+                if (_error instanceof AMQException)
+                {
+                    throw (AMQException) _error;
+                }
+                else if (_error instanceof FailoverException)
+                {
+                    // This should ensure that FailoverException is not wrapped and can be caught.
+                    throw (FailoverException) _error; // needed to expose FailoverException.
+                }
+                else
+                {
+                    throw new AMQException("Woken up due to " + _error.getClass(), _error);
+                }
+            }
+
+        }
+        finally
+        {
+            _waiting.set(false);
+
+            //Release Error handling thread
+            if (_error != null)
+            {
+                _errorAck = true;
+                _errorConditionAck.signal();
+
+                _error = null;
+            }
+            _lock.unlock();
+        }
+
+        return _doneObject;
+    }
+
+    /**
+     * This is a callback, called when an error has occured that should interupt any waiter.
+     * It is also called from within this class to avoid code repetition but it should only be called by the MINA threads.
+     *
+     * Once closed any notification of an exception will be ignored.
+     *
+     * @param e The exception being propogated.
+     */
+    public void error(Exception e)
+    {
+        // set the error so that the thread that is blocking (against blockForFrame())
+        // can pick up the exception and rethrow to the caller
+
+        _lock.lock();
+
+        if (_closed)
+        {
+            return;
+        }
+
+        if (_error == null)
+        {
+            _error = e;
+        }
+        else
+        {
+            System.err.println("WARNING: new error arrived while old one not yet processed");
+        }
+
+        try
+        {
+            if (_waiting.get())
+            {
+
+                _ready = true;
+                _receivedCondition.signal();
+
+                while (!_errorAck)
+                {
+                    try
+                    {
+                        _errorConditionAck.await();
+                    }
+                    catch (InterruptedException e1)
+                    {
+                        System.err.println(e.getMessage());
+                    }
+                }
+                _errorAck = false;
+            }
+        }
+        finally
+        {
+            _lock.unlock();
+        }
+    }
+
+    /**
+     * Close this Waiter so that no more errors are processed.
+     * This is a preventative method to ensure that a second error thread does not get stuck in the error method after
+     * the await has returned. This has not happend but in practise but if two errors occur on the Connection at
+     * the same time then it is conceiveably possible for the second to get stuck if the first one is processed by a
+     * waiter.
+     *
+     * Once closed any attempt to wait will throw an exception.
+     * Any notification of an exception will be ignored.
+     */
+    public void close()
+    {
+        _lock.lock();
+        try
+        {
+            //if we have already closed then our job is done.
+            if (_closed)
+            {
+                return;
+            }
+
+            //Close Waiter so no more exceptions are processed
+            _closed = true;
+
+            //Wake up any await() threads
+
+            //If we are waiting then use the error() to wake them up.
+            if (_waiting.get())
+            {
+                error(throwClosedException());
+            }
+            //If they are not waiting then there is nothing to do.
+
+            // Wake up any error handling threads
+
+            if (!_errorAck)
+            {
+                _errorAck = true;
+                _errorConditionAck.signal();
+
+                _error = null;
+            }
+        }
+        finally
+        {
+            _lock.unlock();
+        }
+    }
+
+    /**
+     * Helper method to generate the a closed Exception.
+     *
+     * todo: This should be converted to something more friendly. 
+     *
+     * @return AMQException to throw to waiters when the Waiter is closed.
+     */
+    private AMQException throwClosedException()
+    {
+        return new AMQException(null, "Waiter was closed.", null);
+    }
+
+}

Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java?rev=676978&r1=676977&r2=676978&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java Tue Jul 15 10:06:16 2008
@@ -27,12 +27,10 @@
 import org.apache.qpid.client.AMQNoConsumersException;
 import org.apache.qpid.client.AMQNoRouteException;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ChannelCloseBody;
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.AMQMethodEvent;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,12 +46,10 @@
         return _handler;
     }
 
-    public void methodReceived(AMQStateManager stateManager,  ChannelCloseBody method, int channelId)
+    public void methodReceived(AMQProtocolSession session,  ChannelCloseBody method, int channelId)
         throws AMQException
     {
         _logger.debug("ChannelClose method received");
-        final AMQProtocolSession session = stateManager.getProtocolSession();
-
 
         AMQConstant errorCode = AMQConstant.getConstant(method.getReplyCode());
         AMQShortString reason = method.getReplyText();

Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java?rev=676978&r1=676977&r2=676978&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java Tue Jul 15 10:06:16 2008
@@ -33,11 +33,12 @@
 
     public void testFailoverURL() throws URLSyntaxException
     {
-        String url = "amqp://ritchiem:bob@/test?brokerlist='tcp://localhost:5672;tcp://fancyserver:3000/',failover='roundrobin'";
+        String url = "amqp://ritchiem:bob@/test?brokerlist='tcp://localhost:5672;tcp://fancyserver:3000/',failover='roundrobin?cyclecount='100''";
 
         ConnectionURL connectionurl = new AMQConnectionURL(url);
 
         assertTrue(connectionurl.getFailoverMethod().equals("roundrobin"));
+        assertEquals("100", connectionurl.getFailoverOption(ConnectionURL.OPTIONS_FAILOVER_CYCLE));        
         assertTrue(connectionurl.getUsername().equals("ritchiem"));
         assertTrue(connectionurl.getPassword().equals("bob"));
         assertTrue(connectionurl.getVirtualHost().equals("/test"));
@@ -276,7 +277,7 @@
 
     public void testSingleTransportMultiOptionURL() throws URLSyntaxException
     {
-        String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672',routingkey='jim',timeout='200',immediatedelivery='true'";
+        String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672?foo='jim'&bar='bob'&fred='jimmy'',routingkey='jim',timeout='200',immediatedelivery='true'";
 
         ConnectionURL connectionurl = new AMQConnectionURL(url);
 
@@ -493,8 +494,38 @@
         }
     }
 
+    public void testSingleTransportMultiOptionOnBrokerURL() throws URLSyntaxException
+    {
+        String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672?foo='jim'&bar='bob'&fred='jimmy'',routingkey='jim',timeout='200',immediatedelivery='true'";
+
+        ConnectionURL connectionurl = new AMQConnectionURL(url);
+
+        assertTrue(connectionurl.getFailoverMethod() == null);
+        assertTrue(connectionurl.getUsername().equals("guest"));
+        assertTrue(connectionurl.getPassword().equals("guest"));
+        assertTrue(connectionurl.getVirtualHost().equals("/test"));
+
+        assertTrue(connectionurl.getBrokerCount() == 1);
+
+        BrokerDetails service = connectionurl.getBrokerDetails(0);
+
+        assertTrue(service.getTransport().equals("tcp"));
+
+        
+        assertTrue(service.getHost().equals("localhost"));
+        assertTrue(service.getPort() == 5672);
+        assertEquals("jim",service.getProperty("foo"));
+        assertEquals("bob",service.getProperty("bar"));
+        assertEquals("jimmy",service.getProperty("fred"));
+
+        assertTrue(connectionurl.getOption("routingkey").equals("jim"));
+        assertTrue(connectionurl.getOption("timeout").equals("200"));
+        assertTrue(connectionurl.getOption("immediatedelivery").equals("true"));
+    }
+
     public static junit.framework.Test suite()
     {
         return new junit.framework.TestSuite(ConnectionURLTest.class);
     }
 }
+

Modified: incubator/qpid/trunk/qpid/java/common.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common.xml?rev=676978&r1=676977&r2=676978&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common.xml (original)
+++ incubator/qpid/trunk/qpid/java/common.xml Tue Jul 15 10:06:16 2008
@@ -121,7 +121,7 @@
 
       ${build.bin}
 
-  ant test [ -Dtest=&lt;pattern&gt; ] [ report ]
+  ant test [ -Dtest=&lt;pattern&gt; ] [-Dprofile=&lt;profile&gt;] [ report ]
 
     Execute unit tests and place the output in the build results
     directory:
@@ -152,7 +152,9 @@
     default to running all available tests for the project or module
     depending on the current working directory.
 
-    Finally it can be useful to append the report target in order to
+    Test Reports
+
+    It can be useful to append the report target in order to
     generate an html summary of the tests that were just run. The
     following command will run both the MongooseTest and GooseTest
     test cases and generate an html summary of the results:
@@ -162,6 +164,16 @@
     See the documentation for the "ant report" target for more details
     on the generated report.
 
+    Test Profiles
+
+    There are a number of profiles defined for running the test suite.
+    These test profiles define how the test should be run. If the test
+    profile is not specified then 'default.testprofile' is utilised.
+    This runs the system tests against the Java InVM broker. Additional
+    test profiles exists as follows:
+
+        cpp : Runs against the built cpp tree broker.
+
   ant report
 
     The report target will generate an html summary of the current

Modified: incubator/qpid/trunk/qpid/java/systests/build.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/build.xml?rev=676978&r1=676977&r2=676978&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/build.xml (original)
+++ incubator/qpid/trunk/qpid/java/systests/build.xml Tue Jul 15 10:06:16 2008
@@ -23,7 +23,7 @@
     <property name="module.depends" value="client broker common junit-toolkit"/>
     <property name="module.test.src" location="src/main/java"/>
     <property name="module.test.excludes"
-              value="**/TTLTest.java,**/DropInTest.java,**/TestClientControlledTest.java,**/SimpleACLTest.java"/>
+              value="**/TTLTest.java,**/DropInTest.java,**/TestClientControlledTest.java"/>
 
     <import file="../module.xml"/>