You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/01/29 12:13:24 UTC
svn commit: r501011 -
/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
Author: rgreig
Date: Mon Jan 29 03:13:23 2007
New Revision: 501011
URL: http://svn.apache.org/viewvc?view=rev&rev=501011
Log:
QPID-313 : Patch supplied by Rob Godfrey - Call to attainState in makeBrokerConnection can miss the notification of state change.
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java?view=diff&rev=501011&r1=501010&r2=501011
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java Mon Jan 29 03:13:23 2007
@@ -55,6 +55,8 @@
private final Map _state2HandlersMap = new HashMap();
private final CopyOnWriteArraySet _stateListeners = new CopyOnWriteArraySet();
+ private final Object _stateLock = new Object();
+ private static final long MAXIMUM_STATE_WAIT_TIME = 30000l;
public AMQStateManager()
{
@@ -127,17 +129,11 @@
public void changeState(AMQState newState) throws AMQException
{
_logger.debug("State changing to " + newState + " from old state " + _currentState);
- final AMQState oldState = _currentState;
- _currentState = newState;
- synchronized (_stateListeners)
+ synchronized (_stateLock)
{
- final Iterator it = _stateListeners.iterator();
- while (it.hasNext())
- {
- final StateListener l = (StateListener) it.next();
- l.stateChanged(oldState, newState);
- }
+ _currentState = newState;
+ _stateLock.notifyAll();
}
}
@@ -204,36 +200,35 @@
}
}
- public void addStateListener(StateListener listener)
- {
- _logger.debug("Adding state listener");
- _stateListeners.add(listener);
- }
-
- public void removeStateListener(StateListener listener)
- {
- _stateListeners.remove(listener);
- }
- public void attainState(AMQState s) throws AMQException
+ public void attainState(final AMQState s) throws AMQException
{
- boolean needToWait = false;
- StateWaiter sw = null;
- synchronized (_stateListeners)
+ synchronized(_stateLock)
{
- if (_currentState != s)
+ final long waitUntilTime = System.currentTimeMillis() + MAXIMUM_STATE_WAIT_TIME;
+ long waitTime = MAXIMUM_STATE_WAIT_TIME;
+
+ while(_currentState != s && waitTime > 0)
{
- _logger.debug("Adding state wait to reach state " + s);
- sw = new StateWaiter(s);
- addStateListener(sw);
- // we use a boolean since we must release the lock before starting to wait
- needToWait = true;
+ try
+ {
+ _stateLock.wait(MAXIMUM_STATE_WAIT_TIME);
+ }
+ catch (InterruptedException e)
+ {
+ _logger.warn("Thread interrupted");
+ }
+ if(_currentState != s)
+ {
+ waitTime = waitUntilTime - System.currentTimeMillis();
+ }
+ }
+ if(_currentState != s)
+ {
+ throw new AMQException("State not achieved within permitted time. Current state " + _currentState + ", desired state: " + s);
}
}
- if (needToWait)
- {
- sw.waituntilStateHasChanged();
- }
+
// at this point the state will have changed.
}