You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/01/15 13:52:47 UTC

svn commit: r496302 - in /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client: protocol/AMQProtocolHandler.java protocol/AMQProtocolSession.java state/AMQStateManager.java

Author: rgreig
Date: Mon Jan 15 04:52:46 2007
New Revision: 496302

URL: http://svn.apache.org/viewvc?view=rev&rev=496302
Log:
QPID-294 : Patch supplied by Rob Godfrey - Fix race condition on client connection

Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?view=diff&rev=496302&r1=496301&r2=496302
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Mon Jan 15 04:52:46 2007
@@ -47,6 +47,7 @@
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.CountDownLatch;
 
+
 public class AMQProtocolHandler extends IoHandlerAdapter
 {
     private static final Logger _logger = Logger.getLogger(AMQProtocolHandler.class);
@@ -69,7 +70,7 @@
      */
     private volatile AMQProtocolSession _protocolSession;
 
-//    private AMQStateManager _stateManager = new AMQStateManager();
+    private AMQStateManager _stateManager = new AMQStateManager();
 
     private final CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet();
 
@@ -142,7 +143,7 @@
             session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter);
         }
 
-        _protocolSession = new AMQProtocolSession(this, session, _connection);
+        _protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager());
         _protocolSession.init();
     }
 
@@ -278,7 +279,7 @@
      */
     public void propagateExceptionToWaiters(Exception e)
     {
-        _protocolSession.getStateManager().error(e);
+        getStateManager().error(e);
         if(!_frameListeners.isEmpty())
         {
             final Iterator it = _frameListeners.iterator();
@@ -317,7 +318,7 @@
             try
             {
 
-                boolean wasAnyoneInterested = _protocolSession.getStateManager().methodReceived(evt);
+                boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
                 if(!_frameListeners.isEmpty())
                 {
                     Iterator it = _frameListeners.iterator();
@@ -329,12 +330,12 @@
                 }
                 if (!wasAnyoneInterested)
                 {
-                    throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener.");
+                    throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener.  Listeners:"  + _frameListeners);
                 }
             }
             catch (AMQException e)
             {
-                _protocolSession.getStateManager().error(e);
+                getStateManager().error(e);
                 if(!_frameListeners.isEmpty())
                 {
                     Iterator it = _frameListeners.iterator();
@@ -395,7 +396,7 @@
   */
     public void attainState(AMQState s) throws AMQException
     {
-        _protocolSession.getStateManager().attainState(s);
+        getStateManager().attainState(s);
     }
 
     /**
@@ -487,7 +488,7 @@
 
     public void closeConnection() throws AMQException
     {
-        _protocolSession.getStateManager().changeState(AMQState.CONNECTION_CLOSING);
+        getStateManager().changeState(AMQState.CONNECTION_CLOSING);
 
         // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
         // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
@@ -557,11 +558,12 @@
 
     public AMQStateManager getStateManager()
     {
-        return _protocolSession.getStateManager();
+        return _stateManager;
     }
 
     public void setStateManager(AMQStateManager stateManager)
     {
+        _stateManager = stateManager;
         _protocolSession.setStateManager(stateManager);
     }
     

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=496302&r1=496301&r2=496302
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Mon Jan 15 04:52:46 2007
@@ -113,6 +113,18 @@
         _stateManager = new AMQStateManager(this);
     }
 
+    public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection, AMQStateManager stateManager)
+    {
+        _protocolHandler = protocolHandler;
+        _minaProtocolSession = protocolSession;
+        // properties of the connection are made available to the event handlers
+        _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
+
+        _stateManager = stateManager;
+        _stateManager.setProtocolSession(this);
+                
+    }
+
     public void init()
     {
         // start the process of setting up the connection. This is the first place that

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java?view=diff&rev=496302&r1=496301&r2=496302
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java Mon Jan 15 04:52:46 2007
@@ -41,7 +41,7 @@
 public class AMQStateManager implements AMQMethodListener
 {
     private static final Logger _logger = Logger.getLogger(AMQStateManager.class);
-    private final AMQProtocolSession _protocolSession;
+    private AMQProtocolSession _protocolSession;
 
     /**
      * The current state
@@ -56,6 +56,12 @@
 
     private final CopyOnWriteArraySet _stateListeners = new CopyOnWriteArraySet();
 
+    public AMQStateManager()
+    {
+        this(null);
+    }
+    
+
     public AMQStateManager(AMQProtocolSession protocolSession)
     {
         this(AMQState.CONNECTION_NOT_STARTED, true, protocolSession);
@@ -229,5 +235,15 @@
             sw.waituntilStateHasChanged();
         }
         // at this point the state will have changed.
+    }
+
+    public AMQProtocolSession getProtocolSession()
+    {
+        return _protocolSession;
+    }
+
+    public void setProtocolSession(AMQProtocolSession session)
+    {
+        _protocolSession = session;
     }
 }