You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2007/01/19 23:19:55 UTC
svn commit: r497974 [1/2] - in /incubator/qpid/branches/qpid.0-9/java:
broker/src/main/java/org/apache/qpid/server/handler/
broker/src/main/java/org/apache/qpid/server/protocol/
broker/src/main/java/org/apache/qpid/server/state/ cluster/src/main/java/o...
Author: kpvdr
Date: Fri Jan 19 14:19:51 2007
New Revision: 497974
URL: http://svn.apache.org/viewvc?view=rev&rev=497974
Log:
Introduced channel close methods into AMQMinaProtocolSession.java; Refactored StateAwareMethodListener.java to simplify call and remove redundant parameters, reworked all affected handlers. Connected the AMQP version information to the protocol session in all broker handlers.
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageAppendHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCheckpointHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCloseHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageEmptyHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOffsetHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOkHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOpenHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRejectHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageResumeHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java Fri Jan 19 14:19:51 2007
@@ -47,17 +47,12 @@
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+ public void methodReceived(AMQProtocolSession protocolSession,
AMQMethodEvent<ChannelCloseBody> evt) throws AMQException
{
ChannelCloseBody body = evt.getMethod();
_logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId +
" and method " + body.methodId);
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- protocolSession.writeResponse(evt, ChannelCloseOkBody.createMethodBody((byte)0, (byte)9));
- protocolSession.closeChannel(evt.getChannelId());
+ protocolSession.closeChannelResponse(evt.getChannelId(), evt.getRequestId());
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java Fri Jan 19 14:19:51 2007
@@ -45,10 +45,10 @@
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+ public void methodReceived(AMQProtocolSession protocolSession,
AMQMethodEvent<ChannelCloseOkBody> evt) throws AMQException
{
_logger.info("Received channel-close-ok for channel-id " + evt.getChannelId());
+ protocolSession.removeChannel(evt.getChannelId());
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java Fri Jan 19 14:19:51 2007
@@ -48,8 +48,7 @@
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+ public void methodReceived(AMQProtocolSession protocolSession,
AMQMethodEvent<ChannelFlowBody> evt) throws AMQException
{
ChannelFlowBody body = evt.getMethod();
@@ -58,12 +57,11 @@
channel.setSuspended(!body.active);
_logger.debug("Channel.Flow for channel " + evt.getChannelId() + ", active=" + body.active);
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- AMQMethodBody response = ChannelFlowOkBody.createMethodBody
- ((byte)0, (byte)9, // AMQP version (major, minor)
- body.active); // active
+ AMQMethodBody response = ChannelFlowOkBody.createMethodBody(
+ protocolSession.getMajor(), // AMQP major version
+ protocolSession.getMinor(), // AMQP minor version
+ body.active); // active
protocolSession.writeResponse(evt, response);
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java Fri Jan 19 14:19:51 2007
@@ -47,15 +47,15 @@
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+ public void methodReceived(AMQProtocolSession protocolSession,
AMQMethodEvent<ChannelOpenBody> evt) throws AMQException
{
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
// XXX: Client id
- AMQMethodBody response = ChannelOpenOkBody.createMethodBody((byte)0, (byte)9, "XXX".getBytes());
+ AMQMethodBody response = ChannelOpenOkBody.createMethodBody(
+ protocolSession.getMajor(), // AMQP major version
+ protocolSession.getMinor(), // AMQP minor version
+ "XXX".getBytes());
protocolSession.writeResponse(evt, response);
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java Fri Jan 19 14:19:51 2007
@@ -47,17 +47,16 @@
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+ public void methodReceived(AMQProtocolSession protocolSession,
AMQMethodEvent<ConnectionCloseBody> evt) throws AMQException
{
final ConnectionCloseBody body = evt.getMethod();
_logger.info("ConnectionClose received with reply code/reply text " + body.replyCode + "/" +
body.replyText + " for " + protocolSession);
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- protocolSession.writeResponse(evt, ConnectionCloseOkBody.createMethodBody((byte)0, (byte)9));
+ protocolSession.writeResponse(evt, ConnectionCloseOkBody.createMethodBody(
+ protocolSession.getMajor(), // AMQP major version
+ protocolSession.getMinor())); // AMQP minor version
try
{
protocolSession.closeSession();
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java Fri Jan 19 14:19:51 2007
@@ -46,8 +46,7 @@
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+ public void methodReceived(AMQProtocolSession protocolSession,
AMQMethodEvent<ConnectionCloseOkBody> evt) throws AMQException
{
//todo should this not do more than just log the method?
@@ -55,7 +54,7 @@
try
{
- stateManager.changeState(AMQState.CONNECTION_CLOSED);
+ protocolSession.getStateManager().changeState(AMQState.CONNECTION_CLOSED);
protocolSession.closeSession();
}
catch (Exception e)
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java Fri Jan 19 14:19:51 2007
@@ -50,8 +50,7 @@
return Long.toString(System.currentTimeMillis());
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+ public void methodReceived(AMQProtocolSession protocolSession,
AMQMethodEvent<ConnectionOpenBody> evt) throws AMQException
{
ConnectionOpenBody body = evt.getMethod();
@@ -64,13 +63,12 @@
contextKey = generateClientID();
}
protocolSession.setContextKey(contextKey);
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- AMQMethodBody response = ConnectionOpenOkBody.createMethodBody
- ((byte)0, (byte)9, // AMQP version (major, minor)
- contextKey); // knownHosts
- stateManager.changeState(AMQState.CONNECTION_OPEN);
+ AMQMethodBody response = ConnectionOpenOkBody.createMethodBody(
+ protocolSession.getMajor(), // AMQP major version
+ protocolSession.getMinor(), // AMQP minor version
+ contextKey); // knownHosts
+ protocolSession.getStateManager().changeState(AMQState.CONNECTION_OPEN);
protocolSession.writeResponse(evt, response);
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java Fri Jan 19 14:19:51 2007
@@ -54,8 +54,7 @@
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+ public void methodReceived(AMQProtocolSession protocolSession,
AMQMethodEvent<ConnectionSecureOkBody> evt) throws AMQException
{
ConnectionSecureOkBody body = evt.getMethod();
@@ -68,6 +67,9 @@
}
AuthenticationResult authResult = authMgr.authenticate(ss, body.response);
+ AMQStateManager stateManager = protocolSession.getStateManager();
+ byte major = protocolSession.getMajor();
+ byte minor = protocolSession.getMinor();
switch (authResult.status)
{
case ERROR:
@@ -75,15 +77,13 @@
// throw new AMQException(AMQConstant.NOT_ALLOWED.getCode(), AMQConstant.NOT_ALLOWED.getName());
_logger.info("Authentication failed");
stateManager.changeState(AMQState.CONNECTION_CLOSING);
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- AMQMethodBody close = ConnectionCloseBody.createMethodBody
- ((byte)0, (byte)9, // AMQP version (major, minor)
- ConnectionCloseBody.getClazz((byte)0, (byte)9), // classId
- ConnectionCloseBody.getMethod((byte)0, (byte)9), // methodId
- AMQConstant.NOT_ALLOWED.getCode(), // replyCode
- AMQConstant.NOT_ALLOWED.getName()); // replyText
+ AMQMethodBody close = ConnectionCloseBody.createMethodBody(
+ major, minor, // AMQP version (major, minor)
+ ConnectionCloseBody.getClazz(major, minor), // classId
+ ConnectionCloseBody.getMethod(major, minor), // methodId
+ AMQConstant.NOT_ALLOWED.getCode(), // replyCode
+ AMQConstant.NOT_ALLOWED.getName()); // replyText
protocolSession.writeResponse(evt, close);
disposeSaslServer(protocolSession);
break;
@@ -93,25 +93,21 @@
// TODO: Check the value of channelMax here: This should be the max
// value of a 2-byte unsigned integer (as channel is only 2 bytes on the wire),
// not Integer.MAX_VALUE (which is signed 4 bytes).
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- AMQMethodBody tune = ConnectionTuneBody.createMethodBody
- ((byte)0, (byte)9, // AMQP version (major, minor)
- Integer.MAX_VALUE, // channelMax
- ConnectionStartOkMethodHandler.getConfiguredFrameSize(), // frameMax
- HeartbeatConfig.getInstance().getDelay()); // heartbeat
+ AMQMethodBody tune = ConnectionTuneBody.createMethodBody(
+ major, minor, // AMQP version (major, minor)
+ Integer.MAX_VALUE, // channelMax
+ ConnectionStartOkMethodHandler.getConfiguredFrameSize(), // frameMax
+ HeartbeatConfig.getInstance().getDelay()); // heartbeat
protocolSession.writeResponse(evt, tune);
disposeSaslServer(protocolSession);
break;
case CONTINUE:
stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- AMQMethodBody challenge = ConnectionSecureBody.createMethodBody
- ((byte)0, (byte)9, // AMQP version (major, minor)
- authResult.challenge); // challenge
+ AMQMethodBody challenge = ConnectionSecureBody.createMethodBody(
+ major, minor, // AMQP version (major, minor)
+ authResult.challenge); // challenge
protocolSession.writeResponse(evt, challenge);
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java Fri Jan 19 14:19:51 2007
@@ -60,8 +60,7 @@
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+ public void methodReceived(AMQProtocolSession protocolSession,
AMQMethodEvent<ConnectionStartOkBody> evt) throws AMQException
{
final ConnectionStartOkBody body = evt.getMethod();
@@ -84,6 +83,7 @@
protocolSession.setClientProperties(body.clientProperties);
}
+ AMQStateManager stateManager = protocolSession.getStateManager();
switch (authResult.status)
{
case ERROR:
@@ -92,24 +92,22 @@
_logger.info("Connected as: " + ss.getAuthorizationID());
stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- AMQMethodBody tune = ConnectionTuneBody.createMethodBody
- ((byte)0, (byte)9, // AMQP version (major, minor)
- Integer.MAX_VALUE, // channelMax
- getConfiguredFrameSize(), // frameMax
- HeartbeatConfig.getInstance().getDelay()); // heartbeat
+ AMQMethodBody tune = ConnectionTuneBody.createMethodBody(
+ protocolSession.getMajor(), // AMQP major version
+ protocolSession.getMinor(), // AMQP minor version
+ Integer.MAX_VALUE, // channelMax
+ getConfiguredFrameSize(), // frameMax
+ HeartbeatConfig.getInstance().getDelay()); // heartbeat
protocolSession.writeRequest(evt.getChannelId(), tune, stateManager);
break;
case CONTINUE:
stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- AMQMethodBody challenge = ConnectionSecureBody.createMethodBody
- ((byte)0, (byte)9, // AMQP version (major, minor)
- authResult.challenge); // challenge
+ AMQMethodBody challenge = ConnectionSecureBody.createMethodBody(
+ protocolSession.getMajor(), // AMQP major version
+ protocolSession.getMinor(), // AMQP minor version
+ authResult.challenge); // challenge
protocolSession.writeRequest(evt.getChannelId(), challenge, stateManager);
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java Fri Jan 19 14:19:51 2007
@@ -42,8 +42,7 @@
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+ public void methodReceived(AMQProtocolSession protocolSession,
AMQMethodEvent<ConnectionTuneOkBody> evt) throws AMQException
{
ConnectionTuneOkBody body = evt.getMethod();
@@ -51,7 +50,7 @@
{
_logger.debug(body);
}
- stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
+ protocolSession.getStateManager().changeState(AMQState.CONNECTION_NOT_OPENED);
protocolSession.initHeartbeats(body.heartbeat);
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java Fri Jan 19 14:19:51 2007
@@ -60,14 +60,11 @@
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+ public void methodReceived(AMQProtocolSession protocolSession,
AMQMethodEvent<ExchangeBoundBody> evt) throws AMQException
{
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- byte major = (byte)0;
- byte minor = (byte)9;
+ byte major = protocolSession.getMajor();
+ byte minor = protocolSession.getMinor();
ExchangeBoundBody body = evt.getMethod();
@@ -78,7 +75,7 @@
{
throw new AMQException("Exchange exchange must not be null");
}
- Exchange exchange = exchangeRegistry.getExchange(exchangeName);
+ Exchange exchange = protocolSession.getExchangeRegistry().getExchange(exchangeName);
AMQMethodBody response;
if (exchange == null)
{
@@ -111,7 +108,7 @@
}
else
{
- AMQQueue queue = queueRegistry.getQueue(queueName);
+ AMQQueue queue = protocolSession.getQueueRegistry().getQueue(queueName);
if (queue == null)
{
// AMQP version change: Be aware of possible changes to parameter order as versions change.
@@ -143,7 +140,7 @@
}
else if (queueName != null)
{
- AMQQueue queue = queueRegistry.getQueue(queueName);
+ AMQQueue queue = protocolSession.getQueueRegistry().getQueue(queueName);
if (queue == null)
{
// AMQP version change: Be aware of possible changes to parameter order as versions change.
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java Fri Jan 19 14:19:51 2007
@@ -53,8 +53,7 @@
exchangeFactory = ApplicationRegistry.getInstance().getExchangeFactory();
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+ public void methodReceived(AMQProtocolSession protocolSession,
AMQMethodEvent<ExchangeDeclareBody> evt) throws AMQException
{
final ExchangeDeclareBody body = evt.getMethod();
@@ -62,6 +61,7 @@
{
_logger.debug("Request to declare exchange of type " + body.type + " with name " + body.exchange);
}
+ ExchangeRegistry exchangeRegistry = protocolSession.getExchangeRegistry();
synchronized(exchangeRegistry)
{
Exchange exchange = exchangeRegistry.getExchange(body.exchange);
@@ -75,10 +75,10 @@
}
if(!body.nowait)
{
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- AMQMethodBody response = ExchangeDeclareOkBody.createMethodBody((byte)0, (byte)9);
+ AMQMethodBody response = ExchangeDeclareOkBody.createMethodBody(
+ protocolSession.getMajor(), // AMQP major version
+ protocolSession.getMinor()); // AMQP minor version
protocolSession.writeResponse(evt, response);
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java Fri Jan 19 14:19:51 2007
@@ -45,18 +45,17 @@
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+ public void methodReceived(AMQProtocolSession protocolSession,
AMQMethodEvent<ExchangeDeleteBody> evt) throws AMQException
{
ExchangeDeleteBody body = evt.getMethod();
try
{
- exchangeRegistry.unregisterExchange(body.exchange, body.ifUnused);
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ protocolSession.getExchangeRegistry().unregisterExchange(body.exchange, body.ifUnused);
// Be aware of possible changes to parameter order as versions change.
- protocolSession.writeResponse(evt, ExchangeDeleteOkBody.createMethodBody((byte)0, (byte)9));
+ protocolSession.writeResponse(evt, ExchangeDeleteOkBody.createMethodBody(
+ protocolSession.getMajor(), // AMQP major version
+ protocolSession.getMinor())); // AMQP minor version
}
catch (ExchangeInUseException e)
{
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageAppendHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageAppendHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageAppendHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageAppendHandler.java Fri Jan 19 14:19:51 2007
@@ -41,10 +41,7 @@
private MessageAppendHandler() {}
- public void methodReceived (AMQStateManager stateManager,
- QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry,
- AMQProtocolSession protocolSession,
+ public void methodReceived (AMQProtocolSession protocolSession,
AMQMethodEvent<MessageAppendBody> evt)
throws AMQException
{
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java Fri Jan 19 14:19:51 2007
@@ -44,10 +44,7 @@
private MessageCancelHandler() {}
- public void methodReceived (AMQStateManager stateManager,
- QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry,
- AMQProtocolSession protocolSession,
+ public void methodReceived (AMQProtocolSession protocolSession,
AMQMethodEvent<MessageCancelBody> evt)
throws AMQException
{
@@ -55,10 +52,10 @@
final MessageCancelBody body = evt.getMethod();
channel.unsubscribeConsumer(protocolSession, body.destination);
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- final AMQMethodBody methodBody = MessageOkBody.createMethodBody((byte)0, (byte)9);
+ final AMQMethodBody methodBody = MessageOkBody.createMethodBody(
+ protocolSession.getMajor(), // AMQP major version
+ protocolSession.getMinor()); // AMQP minor version
protocolSession.writeResponse(evt, methodBody);
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCheckpointHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCheckpointHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCheckpointHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCheckpointHandler.java Fri Jan 19 14:19:51 2007
@@ -41,10 +41,7 @@
private MessageCheckpointHandler() {}
- public void methodReceived (AMQStateManager stateManager,
- QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry,
- AMQProtocolSession protocolSession,
+ public void methodReceived (AMQProtocolSession protocolSession,
AMQMethodEvent<MessageCheckpointBody> evt)
throws AMQException
{
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCloseHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCloseHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCloseHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCloseHandler.java Fri Jan 19 14:19:51 2007
@@ -41,10 +41,7 @@
private MessageCloseHandler() {}
- public void methodReceived (AMQStateManager stateManager,
- QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry,
- AMQProtocolSession protocolSession,
+ public void methodReceived (AMQProtocolSession protocolSession,
AMQMethodEvent<MessageCloseBody> evt)
throws AMQException
{
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java Fri Jan 19 14:19:51 2007
@@ -53,10 +53,7 @@
private MessageConsumeHandler() {}
- public void methodReceived (AMQStateManager stateManager,
- QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry,
- AMQProtocolSession session,
+ public void methodReceived (AMQProtocolSession session,
AMQMethodEvent<MessageConsumeBody> evt)
throws AMQException
{
@@ -71,19 +68,21 @@
}
else
{
- AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : queueRegistry.getQueue(body.queue);
+ AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : session.getQueueRegistry().getQueue(body.queue);
if (queue == null)
{
_log.info("No queue for '" + body.queue + "'");
if(body.queue!=null)
{
- channelClose(session, channelId, stateManager,
- "No such queue, '" + body.queue + "'", AMQConstant.NOT_FOUND);
+ session.closeChannelRequest(evt.getChannelId(), AMQConstant.NOT_FOUND.getCode(),
+ "No such queue, '" + body.queue + "'");
+// channelClose(session, channelId, stateManager,
+// "No such queue, '" + body.queue + "'", AMQConstant.NOT_FOUND);
}
else
{
- connectionClose(session, channelId, stateManager,
+ connectionClose(session, channelId, session.getStateManager(),
"No queue name provided, no default queue defined.",
AMQConstant.NOT_ALLOWED);
}
@@ -94,10 +93,10 @@
{
/*AMQShort*/String destination = channel.subscribeToQueue
(body.destination, queue, session, !body.noAck, /*XXX*/null, body.noLocal);
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- session.writeResponse(evt, MessageOkBody.createMethodBody((byte)0, (byte)9));
+ session.writeResponse(evt, MessageOkBody.createMethodBody(
+ session.getMajor(), // AMQP major version
+ session.getMinor())); // AMQP minor version
//now allow queue to start async processing of any backlog of messages
queue.deliverAsync();
@@ -105,11 +104,13 @@
catch (AMQInvalidSelectorException ise)
{
_log.info("Closing connection due to invalid selector");
- channelClose(session, channelId, stateManager, ise.getMessage(), AMQConstant.INVALID_SELECTOR);
+ session.closeChannelRequest(evt.getChannelId(), AMQConstant.INVALID_SELECTOR.getCode(),
+ ise.getMessage());
+// channelClose(session, channelId, stateManager, ise.getMessage(), AMQConstant.INVALID_SELECTOR);
}
catch (ConsumerTagNotUniqueException e)
{
- connectionClose(session, channelId, stateManager,
+ connectionClose(session, channelId, session.getStateManager(),
"Non-unique consumer tag, '" + body.destination + "'",
AMQConstant.NOT_ALLOWED);
}
@@ -117,37 +118,37 @@
}
}
- private void channelClose(AMQProtocolSession session, int channelId, AMQMethodListener listener,
- String message, AMQConstant code)
- throws AMQException
- {
- /*AMQShort*/String msg = new /*AMQShort*/String(message);
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- session.writeRequest(channelId, ChannelCloseBody.createMethodBody
- ((byte)0, (byte)9, // AMQP version (major, minor)
- MessageConsumeBody.getClazz((byte)0, (byte)9), // classId
- MessageConsumeBody.getMethod((byte)0, (byte)9), // methodId
- code.getCode(), // replyCode
- msg), // replyText
- listener);
- }
+// private void channelClose(AMQProtocolSession session, int channelId, AMQMethodListener listener,
+// String message, AMQConstant code)
+// throws AMQException
+// {
+// /*AMQShort*/String msg = new /*AMQShort*/String(message);
+// // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
+// // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+// // Be aware of possible changes to parameter order as versions change.
+// session.writeRequest(channelId, ChannelCloseBody.createMethodBody
+// ((byte)0, (byte)9, // AMQP version (major, minor)
+// MessageConsumeBody.getClazz((byte)0, (byte)9), // classId
+// MessageConsumeBody.getMethod((byte)0, (byte)9), // methodId
+// code.getCode(), // replyCode
+// msg), // replyText
+// listener);
+// }
private void connectionClose(AMQProtocolSession session, int channelId, AMQMethodListener listener,
String message, AMQConstant code)
throws AMQException
{
+ byte major = session.getMajor();
+ byte minor = session.getMinor();
/*AMQShort*/String msg = new /*AMQShort*/String(message);
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- session.writeRequest(channelId, ConnectionCloseBody.createMethodBody
- ((byte)0, (byte)9, // AMQP version (major, minor)
- MessageConsumeBody.getClazz((byte)0, (byte)9), // classId
- MessageConsumeBody.getMethod((byte)0, (byte)9), // methodId
- code.getCode(), // replyCode
- msg), // replyText
+ session.writeRequest(channelId, ConnectionCloseBody.createMethodBody(
+ major, minor, // AMQP version (major, minor)
+ MessageConsumeBody.getClazz(major, minor), // classId
+ MessageConsumeBody.getMethod(major, minor), // methodId
+ code.getCode(), // replyCode
+ msg), // replyText
listener);
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageEmptyHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageEmptyHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageEmptyHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageEmptyHandler.java Fri Jan 19 14:19:51 2007
@@ -41,10 +41,7 @@
private MessageEmptyHandler() {}
- public void methodReceived (AMQStateManager stateManager,
- QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry,
- AMQProtocolSession protocolSession,
+ public void methodReceived (AMQProtocolSession protocolSession,
AMQMethodEvent<MessageEmptyBody> evt)
throws AMQException
{
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java Fri Jan 19 14:19:51 2007
@@ -40,10 +40,7 @@
private MessageGetHandler() {}
- public void methodReceived (AMQStateManager stateManager,
- QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry,
- AMQProtocolSession protocolSession,
+ public void methodReceived (AMQProtocolSession protocolSession,
AMQMethodEvent<MessageGetBody> evt)
throws AMQException
{
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOffsetHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOffsetHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOffsetHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOffsetHandler.java Fri Jan 19 14:19:51 2007
@@ -41,10 +41,7 @@
private MessageOffsetHandler() {}
- public void methodReceived (AMQStateManager stateManager,
- QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry,
- AMQProtocolSession protocolSession,
+ public void methodReceived (AMQProtocolSession protocolSession,
AMQMethodEvent<MessageOffsetBody> evt)
throws AMQException
{
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOkHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOkHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOkHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOkHandler.java Fri Jan 19 14:19:51 2007
@@ -41,10 +41,7 @@
private MessageOkHandler() {}
- public void methodReceived (AMQStateManager stateManager,
- QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry,
- AMQProtocolSession protocolSession,
+ public void methodReceived (AMQProtocolSession protocolSession,
AMQMethodEvent<MessageOkBody> evt)
throws AMQException
{
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOpenHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOpenHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOpenHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOpenHandler.java Fri Jan 19 14:19:51 2007
@@ -41,10 +41,7 @@
private MessageOpenHandler() {}
- public void methodReceived (AMQStateManager stateManager,
- QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry,
- AMQProtocolSession protocolSession,
+ public void methodReceived (AMQProtocolSession protocolSession,
AMQMethodEvent<MessageOpenBody> evt)
throws AMQException
{
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java Fri Jan 19 14:19:51 2007
@@ -42,18 +42,15 @@
private MessageQosHandler() {}
- public void methodReceived (AMQStateManager stateManager,
- QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry,
- AMQProtocolSession protocolSession,
+ public void methodReceived (AMQProtocolSession protocolSession,
AMQMethodEvent<MessageQosBody> evt)
throws AMQException
{
protocolSession.getChannel(evt.getChannelId()).setPrefetchCount(evt.getMethod().prefetchCount);
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), new MessageOkBody((byte)0, (byte)9));
+ protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), new MessageOkBody(
+ protocolSession.getMajor(), // AMQP major version
+ protocolSession.getMinor())); // AMQP minor version
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java Fri Jan 19 14:19:51 2007
@@ -41,10 +41,7 @@
private MessageRecoverHandler() {}
- public void methodReceived (AMQStateManager stateManager,
- QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry,
- AMQProtocolSession protocolSession,
+ public void methodReceived (AMQProtocolSession protocolSession,
AMQMethodEvent<MessageRecoverBody> evt)
throws AMQException
{
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRejectHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRejectHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRejectHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRejectHandler.java Fri Jan 19 14:19:51 2007
@@ -41,10 +41,7 @@
private MessageRejectHandler() {}
- public void methodReceived (AMQStateManager stateManager,
- QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry,
- AMQProtocolSession protocolSession,
+ public void methodReceived (AMQProtocolSession protocolSession,
AMQMethodEvent<MessageRejectBody> evt)
throws AMQException
{
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageResumeHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageResumeHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageResumeHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageResumeHandler.java Fri Jan 19 14:19:51 2007
@@ -41,10 +41,7 @@
private MessageResumeHandler() {}
- public void methodReceived (AMQStateManager stateManager,
- QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry,
- AMQProtocolSession protocolSession,
+ public void methodReceived (AMQProtocolSession protocolSession,
AMQMethodEvent<MessageResumeBody> evt)
throws AMQException
{
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java Fri Jan 19 14:19:51 2007
@@ -51,10 +51,7 @@
private MessageTransferHandler() {}
- public void methodReceived (AMQStateManager stateManager,
- QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry,
- AMQProtocolSession protocolSession,
+ public void methodReceived (AMQProtocolSession protocolSession,
AMQMethodEvent<MessageTransferBody> evt)
throws AMQException
{
@@ -68,29 +65,32 @@
if (body.destination == null) {
body.destination = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
}
- Exchange e = exchangeRegistry.getExchange(body.destination);
+ Exchange e = protocolSession.getExchangeRegistry().getExchange(body.destination);
// if the exchange does not exist we raise a channel exception
if (e == null) {
- protocolSession.closeChannel(evt.getChannelId());
- // TODO: modify code gen to make getClazz and getMethod public methods rather than protected
- // then we can remove the hardcoded 0,0
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQMethodBody cf = ChannelCloseBody.createMethodBody
- ((byte)0, (byte)9, // AMQP version (major, minor)
- MessageTransferBody.getClazz((byte)0, (byte)9), // classId
- MessageTransferBody.getMethod((byte)0, (byte)9), // methodId
- 500, // replyCode
- UNKNOWN_EXCHANGE_NAME); // replyText
- protocolSession.writeRequest(evt.getChannelId(), cf, stateManager);
+// protocolSession.closeChannel(evt.getChannelId());
+// // TODO: modify code gen to make getClazz and getMethod public methods rather than protected
+// // then we can remove the hardcoded 0,0
+// // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
+// // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+// // Be aware of possible changes to parameter order as versions change.
+// AMQMethodBody cf = ChannelCloseBody.createMethodBody
+// ((byte)0, (byte)9, // AMQP version (major, minor)
+// MessageTransferBody.getClazz((byte)0, (byte)9), // classId
+// MessageTransferBody.getMethod((byte)0, (byte)9), // methodId
+// 500, // replyCode
+// UNKNOWN_EXCHANGE_NAME); // replyText
+// protocolSession.writeRequest(evt.getChannelId(), cf, stateManager);
+ protocolSession.closeChannelRequest(evt.getChannelId(), 500, UNKNOWN_EXCHANGE_NAME);
} else {
// The partially populated BasicDeliver frame plus the received route body
// is stored in the channel. Once the final body frame has been received
// it is routed to the exchange.
AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
channel.addMessageTransfer(body, protocolSession);
- protocolSession.writeResponse(evt, MessageOkBody.createMethodBody((byte)0, (byte)9));
+ protocolSession.writeResponse(evt, MessageOkBody.createMethodBody(
+ protocolSession.getMajor(), // AMQP major version
+ protocolSession.getMinor())); // AMQP minor version
}
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java Fri Jan 19 14:19:51 2007
@@ -50,8 +50,7 @@
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+ public void methodReceived(AMQProtocolSession protocolSession,
AMQMethodEvent<QueueBindBody> evt) throws AMQException
{
final QueueBindBody body = evt.getMethod();
@@ -70,14 +69,14 @@
}
else
{
- queue = queueRegistry.getQueue(body.queue);
+ queue = protocolSession.getQueueRegistry().getQueue(body.queue);
}
if (queue == null)
{
throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), "Queue " + body.queue + " does not exist.");
}
- final Exchange exch = exchangeRegistry.getExchange(body.exchange);
+ final Exchange exch = protocolSession.getExchangeRegistry().getExchange(body.exchange);
if (exch == null)
{
throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), "Exchange " + body.exchange + " does not exist.");
@@ -90,10 +89,10 @@
}
if (!body.nowait)
{
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- final AMQMethodBody response = QueueBindOkBody.createMethodBody((byte)0, (byte)9);
+ final AMQMethodBody response = QueueBindOkBody.createMethodBody(
+ protocolSession.getMajor(), // AMQP major version
+ protocolSession.getMinor()); // AMQP minor version
protocolSession.writeResponse(evt, response);
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Fri Jan 19 14:19:51 2007
@@ -65,8 +65,7 @@
_store = ApplicationRegistry.getInstance().getMessageStore();
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+ public void methodReceived(AMQProtocolSession protocolSession,
AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
{
QueueDeclareBody body = evt.getMethod();
@@ -78,6 +77,7 @@
}
//TODO: do we need to check that the queue already exists with exactly the same "configuration"?
+ QueueRegistry queueRegistry = protocolSession.getQueueRegistry();
synchronized (queueRegistry)
{
AMQQueue queue;
@@ -91,7 +91,7 @@
queueRegistry.registerQueue(queue);
if (autoRegister)
{
- Exchange defaultExchange = exchangeRegistry.getExchange("amq.direct");
+ Exchange defaultExchange = protocolSession.getExchangeRegistry().getExchange("amq.direct");
defaultExchange.registerQueue(body.queue, queue, null);
queue.bind(body.queue, defaultExchange);
_log.info("Queue " + body.queue + " bound to default exchange");
@@ -102,14 +102,13 @@
}
if (!body.nowait)
{
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- AMQMethodBody response = QueueDeclareOkBody.createMethodBody
- ((byte)0, (byte)9, // AMQP version (major, minor)
- 0L, // consumerCount
- 0L, // messageCount
- body.queue); // queue
+ AMQMethodBody response = QueueDeclareOkBody.createMethodBody(
+ protocolSession.getMajor(), // AMQP major version
+ protocolSession.getMinor(), // AMQP minor version
+ 0L, // consumerCount
+ 0L, // messageCount
+ body.queue); // queue
_log.info("Queue " + body.queue + " declared successfully");
protocolSession.writeResponse(evt, response);
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java Fri Jan 19 14:19:51 2007
@@ -57,7 +57,7 @@
}
- public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
+ public void methodReceived(AMQProtocolSession session, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
{
QueueDeleteBody body = evt.getMethod();
AMQQueue queue;
@@ -67,7 +67,7 @@
}
else
{
- queue = queues.getQueue(body.queue);
+ queue = session.getQueueRegistry().getQueue(body.queue);
}
if(queue == null)
@@ -81,12 +81,11 @@
{
int purged = queue.delete(body.ifUnused, body.ifEmpty);
_store.removeQueue(queue.getName());
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- session.writeResponse(evt, QueueDeleteOkBody.createMethodBody
- ((byte)0, (byte)9, // AMQP version (major, minor)
- purged)); // messageCount
+ session.writeResponse(evt, QueueDeleteOkBody.createMethodBody(
+ session.getMajor(), // AMQP major version
+ session.getMinor(), // AMQP minor version
+ purged)); // messageCount
}
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java Fri Jan 19 14:19:51 2007
@@ -44,18 +44,17 @@
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+ public void methodReceived(AMQProtocolSession protocolSession,
AMQMethodEvent<TxCommitBody> evt) throws AMQException
{
try{
AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
channel.commit();
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- protocolSession.writeResponse(evt, TxCommitOkBody.createMethodBody((byte)0, (byte)9));
+ protocolSession.writeResponse(evt, TxCommitOkBody.createMethodBody(
+ protocolSession.getMajor(), // AMQP major version
+ protocolSession.getMinor())); // AMQP minor version
channel.processReturns(protocolSession);
}catch(AMQException e){
throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage());
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java Fri Jan 19 14:19:51 2007
@@ -44,17 +44,16 @@
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+ public void methodReceived(AMQProtocolSession protocolSession,
AMQMethodEvent<TxRollbackBody> evt) throws AMQException
{
try{
AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
channel.rollback();
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- protocolSession.writeResponse(evt, TxRollbackOkBody.createMethodBody((byte)0, (byte)9));
+ protocolSession.writeResponse(evt, TxRollbackOkBody.createMethodBody(
+ protocolSession.getMajor(), // AMQP major version
+ protocolSession.getMinor())); // AMQP minor version
//Now resend all the unacknowledged messages back to the original subscribers.
//(Must be done after the TxnRollback-ok response).
channel.resend(protocolSession);
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java Fri Jan 19 14:19:51 2007
@@ -43,14 +43,13 @@
{
}
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+ public void methodReceived(AMQProtocolSession protocolSession,
AMQMethodEvent<TxSelectBody> evt) throws AMQException
{
protocolSession.getChannel(evt.getChannelId()).setTransactional(true);
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- protocolSession.writeResponse(evt, TxSelectOkBody.createMethodBody((byte)0, (byte)9));
+ protocolSession.writeResponse(evt, TxSelectOkBody.createMethodBody(
+ protocolSession.getMajor(), // AMQP major version
+ protocolSession.getMinor())); // AMQP minor version
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Fri Jan 19 14:19:51 2007
@@ -29,6 +29,8 @@
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ConnectionStartBody;
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.ChannelCloseOkBody;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.Content;
import org.apache.qpid.framing.FieldTable;
@@ -40,6 +42,7 @@
import org.apache.qpid.framing.RequestManager;
import org.apache.qpid.framing.ResponseManager;
import org.apache.qpid.framing.RequestResponseMappingException;
+import org.apache.qpid.framing.MessageTransferBody;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.codec.AMQDecoder;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -270,6 +273,15 @@
return requestManager.sendRequest(methodBody, methodListener);
}
+ // This version uses this session's instance of AMQStateManager as the listener
+ public long writeRequest(int channelNum, AMQMethodBody methodBody)
+ throws AMQException
+ {
+ AMQChannel channel = getChannel(channelNum);
+ RequestManager requestManager = channel.getRequestManager();
+ return requestManager.sendRequest(methodBody, _stateManager);
+ }
+
public void writeResponse(int channelNum, long requestId, AMQMethodBody methodBody)
throws AMQException
{
@@ -371,7 +383,8 @@
* @throws AMQException if an error occurs closing the channel
* @throws IllegalArgumentException if the channel id is not valid
*/
- public void closeChannel(int channelId) throws AMQException
+ // Used to close a channel as a response to a client close request
+ public void closeChannelResponse(int channelId, long requestId) throws AMQException
{
final AMQChannel channel = _channelMap.get(channelId);
if (channel == null)
@@ -383,6 +396,8 @@
try
{
channel.close(this);
+ // Be aware of possible changes to parameter order as versions change.
+ writeResponse(channelId, requestId, ChannelCloseOkBody.createMethodBody(_major, _minor));
}
finally
{
@@ -390,6 +405,28 @@
}
}
}
+
+ // Used to close a channel from the server side and inform the client
+ public void closeChannelRequest(int channelId, int replyCode, String replyText) throws AMQException
+ {
+ final AMQChannel channel = _channelMap.get(channelId);
+ if (channel == null)
+ {
+ throw new IllegalArgumentException("Unknown channel id");
+ }
+ else
+ {
+ channel.close(this);
+ // Be aware of possible changes to parameter order as versions change.
+ AMQMethodBody cf = ChannelCloseBody.createMethodBody
+ (_major, _minor, // AMQP version (major, minor)
+ MessageTransferBody.getClazz((byte)0, (byte)9), // classId
+ MessageTransferBody.getMethod((byte)0, (byte)9), // methodId
+ replyCode, // replyCode
+ replyText); // replyText
+ writeRequest(channelId, cf);
+ }
+ }
/**
* In our current implementation this is used by the clustering code.
@@ -510,6 +547,16 @@
_clientProperties = clientProperties;
}
+ public QueueRegistry getQueueRegistry()
+ {
+ return _queueRegistry;
+ }
+
+ public ExchangeRegistry getExchangeRegistry()
+ {
+ return _exchangeRegistry;
+ }
+
public AMQStateManager getStateManager()
{
return _stateManager;
@@ -520,12 +567,12 @@
* NOTE: Both major and minor will be set to 0 prior to protocol initiation.
*/
- public byte getAmqpMajor()
+ public byte getMajor()
{
return _major;
}
- public byte getAmqpMinor()
+ public byte getMinor()
{
return _minor;
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Fri Jan 19 14:19:51 2007
@@ -25,6 +25,9 @@
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
import javax.security.sasl.SaslServer;
@@ -73,10 +76,13 @@
* <ul><li>any queue subscriptions (this may in turn remove queues if they are auto delete</li>
* </ul>
* @param channelId id of the channel to close
+ * @param requestId id of the request that initiated the close, used in response
* @throws org.apache.qpid.AMQException if an error occurs closing the channel
* @throws IllegalArgumentException if the channel id is not valid
*/
- void closeChannel(int channelId) throws AMQException;
+ void closeChannelResponse(int channelId, long requestId) throws AMQException;
+
+ void closeChannelRequest(int channelId, int replyCode, String replyText) throws AMQException;
/**
* Remove a channel from the session but do not close it.
@@ -124,4 +130,11 @@
FieldTable getClientProperties();
void setClientProperties(FieldTable clientProperties);
+
+ QueueRegistry getQueueRegistry();
+ ExchangeRegistry getExchangeRegistry();
+ AMQStateManager getStateManager();
+ byte getMajor();
+ byte getMinor();
+ boolean amqpVersionEquals(byte major, byte minor);
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java Fri Jan 19 14:19:51 2007
@@ -178,7 +178,7 @@
StateAwareMethodListener<B> handler = findStateTransitionHandler(_currentState, evt.getMethod());
if (handler != null)
{
- handler.methodReceived(this, _queueRegistry, _exchangeRegistry, _protocolSession, evt);
+ handler.methodReceived(_protocolSession, evt);
return true;
}
return false;
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java Fri Jan 19 14:19:51 2007
@@ -34,7 +34,7 @@
*/
public interface StateAwareMethodListener <B extends AMQMethodBody>
{
- void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<B> evt) throws AMQException;
+ void methodReceived(AMQProtocolSession protocolSession,
+ AMQMethodEvent<B> evt)
+ throws AMQException;
}