You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2007/01/15 15:20:39 UTC
svn commit: r496326 - in /incubator/qpid/branches/qpid.0-9/java:
broker/src/main/java/org/apache/qpid/server/protocol/
broker/src/main/java/org/apache/qpid/server/state/
client/src/main/java/org/apache/qpid/client/failover/
client/src/main/java/org/apa...
Author: kpvdr
Date: Mon Jan 15 06:20:37 2007
New Revision: 496326
URL: http://svn.apache.org/viewvc?view=rev&rev=496326
Log:
Merged the refactor to a common AMQMethodListener class on trunk, plus the race condition fix of Robert Godfrey. This opens the way for Request and Response managers to use a common event dispatch for both client and server.
Added:
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java
- copied, changed from r496305, incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java
Removed:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodListener.java
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=496326&r1=496325&r2=496326
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Mon Jan 15 06:20:37 2007
@@ -42,6 +42,7 @@
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.codec.AMQDecoder;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
@@ -111,7 +112,16 @@
AMQCodecFactory codecFactory)
throws AMQException
{
- this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager());
+ _stateManager = new AMQStateManager(queueRegistry, exchangeRegistry, this);
+ _minaProtocolSession = session;
+ session.setAttachment(this);
+ _frameListeners.add(_stateManager);
+ _queueRegistry = queueRegistry;
+ _exchangeRegistry = exchangeRegistry;
+ _codecFactory = codecFactory;
+ _managedObject = createMBean();
+ _managedObject.register();
+// this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager());
}
public AMQMinaProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry,
@@ -263,7 +273,7 @@
// boolean wasAnyoneInterested = false;
// for (AMQMethodListener listener : _frameListeners)
// {
-// wasAnyoneInterested = listener.methodReceived(evt, this, _queueRegistry, _exchangeRegistry) ||
+// wasAnyoneInterested = listener.methodReceived(evt) ||
// wasAnyoneInterested;
// }
// if (!wasAnyoneInterested)
@@ -276,7 +286,7 @@
// _logger.error("Closing channel due to: " + e.getMessage());
// writeFrame(e.getCloseFrame(frame.channel));
// }
-// catch (AMQException e)
+// catch (Exception e)
// {
// for (AMQMethodListener listener : _frameListeners)
// {
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java?view=diff&rev=496326&r1=496325&r2=496326
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java Mon Jan 15 06:20:37 2007
@@ -25,7 +25,7 @@
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.handler.*;
-import org.apache.qpid.server.protocol.AMQMethodListener;
+import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.log4j.Logger;
@@ -44,6 +44,9 @@
{
private static final Logger _logger = Logger.getLogger(AMQStateManager.class);
+ private final QueueRegistry _queueRegistry;
+ private final ExchangeRegistry _exchangeRegistry;
+ private final AMQProtocolSession _protocolSession;
/**
* The current state
*/
@@ -58,13 +61,16 @@
private CopyOnWriteArraySet<StateListener> _stateListeners = new CopyOnWriteArraySet<StateListener>();
- public AMQStateManager()
+ public AMQStateManager(QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
{
- this(AMQState.CONNECTION_NOT_STARTED, true);
+ this(AMQState.CONNECTION_NOT_STARTED, true, queueRegistry, exchangeRegistry, protocolSession);
}
- protected AMQStateManager(AMQState initial, boolean register)
+ protected AMQStateManager(AMQState initial, boolean register, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
{
+ _queueRegistry = queueRegistry;
+ _exchangeRegistry = exchangeRegistry;
+ _protocolSession = protocolSession;
_currentState = initial;
if (register)
{
@@ -158,7 +164,7 @@
}
}
- public void error(AMQException e)
+ public void error(Exception e)
{
_logger.error("State manager received error notification: " + e, e);
for (StateListener l : _stateListeners)
@@ -167,15 +173,12 @@
}
}
- public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt,
- AMQProtocolSession protocolSession,
- QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry) throws AMQException
+ public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException
{
StateAwareMethodListener<B> handler = findStateTransitionHandler(_currentState, evt.getMethod());
if (handler != null)
{
- handler.methodReceived(this, queueRegistry, exchangeRegistry, protocolSession, evt);
+ handler.methodReceived(this, _queueRegistry, _exchangeRegistry, _protocolSession, evt);
return true;
}
return false;
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java?view=diff&rev=496326&r1=496325&r2=496326
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java Mon Jan 15 06:20:37 2007
@@ -89,7 +89,7 @@
// have a state waiter waiting until the connection is closed for some reason. Or in future we may have
// a slightly more complex state model therefore I felt it was worthwhile doing this.
AMQStateManager existingStateManager = _amqProtocolHandler.getStateManager();
- _amqProtocolHandler.setStateManager(new AMQStateManager());
+ _amqProtocolHandler.setStateManager(new AMQStateManager(_amqProtocolHandler.getProtocolSession()));
if (!_amqProtocolHandler.getConnection().firePreFailover(_host != null))
{
_amqProtocolHandler.setStateManager(existingStateManager);
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?view=diff&rev=496326&r1=496325&r2=496326
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Mon Jan 15 06:20:37 2007
@@ -46,6 +46,7 @@
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.HeartbeatBody;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.ssl.BogusSSLContextFactory;
@@ -148,7 +149,7 @@
session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter);
}
- _protocolSession = new AMQProtocolSession(this, session, _connection);
+ _protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager());
_protocolSession.init();
}
@@ -284,7 +285,7 @@
*/
public void propagateExceptionToWaiters(Exception e)
{
- _stateManager.error(e);
+ getStateManager().error(e);
final Iterator it = _frameListeners.iterator();
while (it.hasNext())
{
@@ -321,11 +322,11 @@
while (it.hasNext())
{
final AMQMethodListener listener = (AMQMethodListener) it.next();
- wasAnyoneInterested = listener.methodReceived(evt, _protocolSession) || wasAnyoneInterested;
+ wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
}
if (!wasAnyoneInterested)
{
- throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener.");
+ throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + _frameListeners);
}
}
catch (AMQException e)
@@ -383,7 +384,7 @@
public void attainState(AMQState s) throws AMQException
{
- _stateManager.attainState(s);
+ getStateManager().attainState(s);
}
/**
@@ -471,7 +472,7 @@
public void closeConnection() throws AMQException
{
- _stateManager.changeState(AMQState.CONNECTION_CLOSING);
+ getStateManager().changeState(AMQState.CONNECTION_CLOSING);
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
@@ -547,6 +548,12 @@
public void setStateManager(AMQStateManager stateManager)
{
_stateManager = stateManager;
+ _protocolSession.setStateManager(stateManager);
+ }
+
+ public AMQProtocolSession getProtocolSession()
+ {
+ return _protocolSession;
}
FailoverState getFailoverState()
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=496326&r1=496325&r2=496326
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Mon Jan 15 06:20:37 2007
@@ -31,6 +31,7 @@
import org.apache.qpid.client.ConnectionTuneParameters;
import org.apache.qpid.client.message.UnexpectedBodyReceivedException;
import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
@@ -66,6 +67,8 @@
protected static final String SASL_CLIENT = "SASLClient";
protected final IoSession _minaProtocolSession;
+
+ private AMQStateManager _stateManager;
protected WriteFuture _lastWriteFuture;
@@ -102,6 +105,7 @@
{
_protocolHandler = null;
_minaProtocolSession = null;
+ _stateManager = new AMQStateManager(this);
}
public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
@@ -110,6 +114,19 @@
_minaProtocolSession = protocolSession;
// properties of the connection are made available to the event handlers
_minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
+ _stateManager = new AMQStateManager(this);
+ }
+
+ public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection, AMQStateManager stateManager)
+ {
+ _protocolHandler = protocolHandler;
+ _minaProtocolSession = protocolSession;
+ // properties of the connection are made available to the event handlers
+ _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
+
+ _stateManager = stateManager;
+ _stateManager.setProtocolSession(this);
+
}
public void init()
@@ -139,6 +156,16 @@
public void setClientID(String clientID) throws JMSException
{
getAMQConnection().setClientID(clientID);
+ }
+
+ public AMQStateManager getStateManager()
+ {
+ return _stateManager;
+ }
+
+ public void setStateManager(AMQStateManager stateManager)
+ {
+ _stateManager = stateManager;
}
public String getVirtualHost()
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java?view=diff&rev=496326&r1=496325&r2=496326
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java Mon Jan 15 06:20:37 2007
@@ -23,6 +23,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.client.protocol.AMQProtocolSession;
public abstract class BlockingMethodFrameListener implements AMQMethodListener
@@ -55,7 +56,7 @@
* @return true if the listener has dealt with this frame
* @throws AMQException
*/
- public boolean methodReceived(AMQMethodEvent evt, AMQProtocolSession protocolSession) throws AMQException
+ public boolean methodReceived(AMQMethodEvent evt) throws AMQException
{
AMQMethodBody method = evt.getMethod();
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java?view=diff&rev=496326&r1=496325&r2=496326
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java Mon Jan 15 06:20:37 2007
@@ -22,8 +22,8 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.client.handler.*;
-import org.apache.qpid.client.protocol.AMQMethodListener;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.framing.*;
import org.apache.log4j.Logger;
@@ -41,6 +41,7 @@
public class AMQStateManager implements AMQMethodListener
{
private static final Logger _logger = Logger.getLogger(AMQStateManager.class);
+ private AMQProtocolSession _protocolSession;
/**
* The current state
@@ -54,14 +55,20 @@
private final Map _state2HandlersMap = new HashMap();
private final CopyOnWriteArraySet _stateListeners = new CopyOnWriteArraySet();
-
+
public AMQStateManager()
{
- this(AMQState.CONNECTION_NOT_STARTED, true);
+ this(null);
}
- protected AMQStateManager(AMQState state, boolean register)
+ public AMQStateManager(AMQProtocolSession protocolSession)
{
+ this(AMQState.CONNECTION_NOT_STARTED, true, protocolSession);
+ }
+
+ protected AMQStateManager(AMQState state, boolean register, AMQProtocolSession protocolSession)
+ {
+ _protocolSession = protocolSession;
_currentState = state;
if(register)
{
@@ -154,12 +161,12 @@
}
}
- public boolean methodReceived(AMQMethodEvent evt, AMQProtocolSession protocolSession) throws AMQException
+ public boolean methodReceived(AMQMethodEvent evt) throws AMQException
{
StateAwareMethodListener handler = findStateTransitionHandler(_currentState, evt.getMethod());
if (handler != null)
{
- handler.methodReceived(this, protocolSession, evt);
+ handler.methodReceived(this, _protocolSession, evt);
return true;
}
return false;
@@ -234,5 +241,15 @@
sw.waituntilStateHasChanged();
}
// at this point the state will have changed.
+ }
+
+ public AMQProtocolSession getProtocolSession()
+ {
+ return _protocolSession;
+ }
+
+ public void setProtocolSession(AMQProtocolSession session)
+ {
+ _protocolSession = session;
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java?view=diff&rev=496326&r1=496325&r2=496326
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java Mon Jan 15 06:20:37 2007
@@ -52,7 +52,7 @@
public void handle(int channel, AMQMethodBody method) throws AMQException
{
AMQMethodEvent evt = new AMQMethodEvent(channel, method);
- _stateMgr.methodReceived(evt, _session);
+ _stateMgr.methodReceived(evt);
}
private class SessionAdapter extends AMQProtocolSession
Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java?view=diff&rev=496326&r1=496325&r2=496326
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java Mon Jan 15 06:20:37 2007
@@ -29,6 +29,7 @@
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.IllegalStateTransitionException;
import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.ConnectionCloseBody;
@@ -49,9 +50,9 @@
private final Map<AMQState, ClientRegistry> _handlers = new HashMap<AMQState, ClientRegistry>();
private final MemberHandle _identity;
- protected ClientHandlerRegistry(MemberHandle local)
+ protected ClientHandlerRegistry(MemberHandle local, AMQProtocolSession protocolSession)
{
- super(AMQState.CONNECTION_NOT_STARTED, false);
+ super(AMQState.CONNECTION_NOT_STARTED, false, protocolSession);
_identity = local;
Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java?view=diff&rev=496326&r1=496325&r2=496326
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java Mon Jan 15 06:20:37 2007
@@ -46,7 +46,8 @@
ServerHandlerRegistry getHandlerRegistry()
{
- return new ServerHandlerRegistry(getHandlerFactory());
+ // TODO - FIX THIS!
+ return new ServerHandlerRegistry(getHandlerFactory(), null, null, null);
}
private MethodHandlerFactory getHandlerFactory()
Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java?view=diff&rev=496326&r1=496325&r2=496326
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java Mon Jan 15 06:20:37 2007
@@ -33,6 +33,7 @@
import org.apache.qpid.framing.ClusterMembershipBody;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQPFastProtocolHandler;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
@@ -73,9 +74,9 @@
_handlers = handler._handlers;
}
- protected void createSession(IoSession session, QueueRegistry queues, ExchangeRegistry exchanges, AMQCodecFactory codec) throws AMQException
+ protected void createSession(IoSession session, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession protocolSession, AMQCodecFactory codec) throws AMQException
{
- new ClusteredProtocolSession(session, queues, exchanges, codec, new ServerHandlerRegistry(_handlers));
+ new ClusteredProtocolSession(session, queues, exchanges, codec, new ServerHandlerRegistry(_handlers, queues, exchanges, protocolSession));
}
void connect(String join) throws Exception
Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java?view=diff&rev=496326&r1=496325&r2=496326
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java Mon Jan 15 06:20:37 2007
@@ -63,7 +63,8 @@
{
super(host, port);
_local = local;
- _legacyHandler = new ClientHandlerRegistry(local);
+ // TODO - FIX THIS
+ _legacyHandler = new ClientHandlerRegistry(local, null);
}
private void init(IoSession session)
Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java?view=diff&rev=496326&r1=496325&r2=496326
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java Mon Jan 15 06:20:37 2007
@@ -27,6 +27,9 @@
import org.apache.qpid.server.state.IllegalStateTransitionException;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.cluster.util.LogMessage;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
import java.util.HashMap;
import java.util.Map;
@@ -40,20 +43,23 @@
private final Logger _logger = Logger.getLogger(ServerHandlerRegistry.class);
private final Map<AMQState, MethodHandlerRegistry> _handlers = new HashMap<AMQState, MethodHandlerRegistry>();
- ServerHandlerRegistry()
+ ServerHandlerRegistry(QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry,
++ AMQProtocolSession protocolSession)
{
- super(AMQState.CONNECTION_NOT_STARTED, false);
+ super(AMQState.CONNECTION_NOT_STARTED, false, queueRegistry, exchangeRegistry, protocolSession);
}
- ServerHandlerRegistry(ServerHandlerRegistry s)
+ ServerHandlerRegistry(ServerHandlerRegistry s, QueueRegistry queueRegistry,
++ ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
{
- this();
+ this(queueRegistry, exchangeRegistry, protocolSession);
_handlers.putAll(s._handlers);
}
- ServerHandlerRegistry(MethodHandlerFactory factory)
+ ServerHandlerRegistry(MethodHandlerFactory factory, QueueRegistry queueRegistry,
++ ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
{
- this();
+ this(queueRegistry, exchangeRegistry, protocolSession);
init(factory);
}
Copied: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java (from r496305, incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java?view=diff&rev=496326&p1=incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java&r1=496305&p2=incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java&r2=496326
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java Mon Jan 15 06:20:37 2007
@@ -18,12 +18,8 @@
* under the License.
*
*/
-package org.apache.qpid.server.protocol;
+package org.apache.qpid.protocol;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.framing.AMQMethodBody;
/**
@@ -35,22 +31,18 @@
/**
* Invoked when a method frame has been received
* @param evt the event that contains the method and channel
- * @param protocolSession the protocol session associated with the event
* @return true if the handler has processed the method frame, false otherwise. Note
* that this does not prohibit the method event being delivered to subsequent listeners
* but can be used to determine if nobody has dealt with an incoming method frame.
- * @throws AMQException if an error has occurred. This exception will be delivered
+ * @throws Exception if an error has occurred. This exception will be delivered
* to all registered listeners using the error() method (see below) allowing them to
* perform cleanup if necessary.
*/
- <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt,
- AMQProtocolSession protocolSession,
- QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry) throws AMQException;
+ <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws Exception;
/**
* Callback when an error has occurred. Allows listeners to clean up.
* @param e
*/
- void error(AMQException e);
+ void error(Exception e);
}