You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/01/29 12:13:24 UTC

svn commit: r501011 - /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java

Author: rgreig
Date: Mon Jan 29 03:13:23 2007
New Revision: 501011

URL: http://svn.apache.org/viewvc?view=rev&rev=501011
Log:
QPID-313 : Patch supplied by Rob Godfrey - Call to attainState in makeBrokerConnection can miss the notification of state change.




Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java?view=diff&rev=501011&r1=501010&r2=501011
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java Mon Jan 29 03:13:23 2007
@@ -55,6 +55,8 @@
     private final Map _state2HandlersMap = new HashMap();
 
     private final CopyOnWriteArraySet _stateListeners = new CopyOnWriteArraySet();
+    private final Object _stateLock = new Object();
+    private static final long MAXIMUM_STATE_WAIT_TIME = 30000l;
 
     public AMQStateManager()
     {
@@ -127,17 +129,11 @@
     public void changeState(AMQState newState) throws AMQException
     {
         _logger.debug("State changing to " + newState + " from old state " + _currentState);
-        final AMQState oldState = _currentState;
-        _currentState = newState;
 
-        synchronized (_stateListeners)
+        synchronized (_stateLock)
         {
-            final Iterator it = _stateListeners.iterator();
-            while (it.hasNext())
-            {
-                final StateListener l = (StateListener) it.next();
-                l.stateChanged(oldState, newState);
-            }
+            _currentState = newState;
+            _stateLock.notifyAll();
         }
     }
 
@@ -204,36 +200,35 @@
         }
     }
 
-    public void addStateListener(StateListener listener)
-    {
-        _logger.debug("Adding state listener");
-        _stateListeners.add(listener);
-    }
-
-    public void removeStateListener(StateListener listener)
-    {
-        _stateListeners.remove(listener);
-    }
 
-    public void attainState(AMQState s) throws AMQException
+    public void attainState(final AMQState s) throws AMQException
     {
-        boolean needToWait = false;
-        StateWaiter sw = null;
-        synchronized (_stateListeners)
+        synchronized(_stateLock)
         {
-            if (_currentState != s)
+            final long waitUntilTime = System.currentTimeMillis() + MAXIMUM_STATE_WAIT_TIME;
+            long waitTime = MAXIMUM_STATE_WAIT_TIME;
+
+            while(_currentState != s && waitTime > 0)
             {
-                _logger.debug("Adding state wait to reach state " + s);
-                sw = new StateWaiter(s);
-                addStateListener(sw);
-                // we use a boolean since we must release the lock before starting to wait
-                needToWait = true;
+                try
+                {
+                    _stateLock.wait(MAXIMUM_STATE_WAIT_TIME);
+                }
+                catch (InterruptedException e)
+                {
+                    _logger.warn("Thread interrupted");
+                }
+                if(_currentState != s)
+                {
+                    waitTime = waitUntilTime - System.currentTimeMillis();
+                }
+            }
+            if(_currentState != s)
+            {
+                throw new AMQException("State not achieved within permitted time.  Current state " + _currentState + ", desired state: " + s);
             }
         }
-        if (needToWait)
-        {
-            sw.waituntilStateHasChanged();
-        }
+
         // at this point the state will have changed.
     }