You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/01/15 13:52:47 UTC
svn commit: r496302 - in
/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client:
protocol/AMQProtocolHandler.java protocol/AMQProtocolSession.java
state/AMQStateManager.java
Author: rgreig
Date: Mon Jan 15 04:52:46 2007
New Revision: 496302
URL: http://svn.apache.org/viewvc?view=rev&rev=496302
Log:
QPID-294 : Patch supplied by Rob Godfrey - Fix race condition on client connection
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?view=diff&rev=496302&r1=496301&r2=496302
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Mon Jan 15 04:52:46 2007
@@ -47,6 +47,7 @@
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
+
public class AMQProtocolHandler extends IoHandlerAdapter
{
private static final Logger _logger = Logger.getLogger(AMQProtocolHandler.class);
@@ -69,7 +70,7 @@
*/
private volatile AMQProtocolSession _protocolSession;
-// private AMQStateManager _stateManager = new AMQStateManager();
+ private AMQStateManager _stateManager = new AMQStateManager();
private final CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet();
@@ -142,7 +143,7 @@
session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter);
}
- _protocolSession = new AMQProtocolSession(this, session, _connection);
+ _protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager());
_protocolSession.init();
}
@@ -278,7 +279,7 @@
*/
public void propagateExceptionToWaiters(Exception e)
{
- _protocolSession.getStateManager().error(e);
+ getStateManager().error(e);
if(!_frameListeners.isEmpty())
{
final Iterator it = _frameListeners.iterator();
@@ -317,7 +318,7 @@
try
{
- boolean wasAnyoneInterested = _protocolSession.getStateManager().methodReceived(evt);
+ boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
if(!_frameListeners.isEmpty())
{
Iterator it = _frameListeners.iterator();
@@ -329,12 +330,12 @@
}
if (!wasAnyoneInterested)
{
- throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener.");
+ throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + _frameListeners);
}
}
catch (AMQException e)
{
- _protocolSession.getStateManager().error(e);
+ getStateManager().error(e);
if(!_frameListeners.isEmpty())
{
Iterator it = _frameListeners.iterator();
@@ -395,7 +396,7 @@
*/
public void attainState(AMQState s) throws AMQException
{
- _protocolSession.getStateManager().attainState(s);
+ getStateManager().attainState(s);
}
/**
@@ -487,7 +488,7 @@
public void closeConnection() throws AMQException
{
- _protocolSession.getStateManager().changeState(AMQState.CONNECTION_CLOSING);
+ getStateManager().changeState(AMQState.CONNECTION_CLOSING);
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
@@ -557,11 +558,12 @@
public AMQStateManager getStateManager()
{
- return _protocolSession.getStateManager();
+ return _stateManager;
}
public void setStateManager(AMQStateManager stateManager)
{
+ _stateManager = stateManager;
_protocolSession.setStateManager(stateManager);
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=496302&r1=496301&r2=496302
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Mon Jan 15 04:52:46 2007
@@ -113,6 +113,18 @@
_stateManager = new AMQStateManager(this);
}
+ public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection, AMQStateManager stateManager)
+ {
+ _protocolHandler = protocolHandler;
+ _minaProtocolSession = protocolSession;
+ // properties of the connection are made available to the event handlers
+ _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
+
+ _stateManager = stateManager;
+ _stateManager.setProtocolSession(this);
+
+ }
+
public void init()
{
// start the process of setting up the connection. This is the first place that
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java?view=diff&rev=496302&r1=496301&r2=496302
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java Mon Jan 15 04:52:46 2007
@@ -41,7 +41,7 @@
public class AMQStateManager implements AMQMethodListener
{
private static final Logger _logger = Logger.getLogger(AMQStateManager.class);
- private final AMQProtocolSession _protocolSession;
+ private AMQProtocolSession _protocolSession;
/**
* The current state
@@ -56,6 +56,12 @@
private final CopyOnWriteArraySet _stateListeners = new CopyOnWriteArraySet();
+ public AMQStateManager()
+ {
+ this(null);
+ }
+
+
public AMQStateManager(AMQProtocolSession protocolSession)
{
this(AMQState.CONNECTION_NOT_STARTED, true, protocolSession);
@@ -229,5 +235,15 @@
sw.waituntilStateHasChanged();
}
// at this point the state will have changed.
+ }
+
+ public AMQProtocolSession getProtocolSession()
+ {
+ return _protocolSession;
+ }
+
+ public void setProtocolSession(AMQProtocolSession session)
+ {
+ _protocolSession = session;
}
}