You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2008/02/08 11:10:11 UTC
svn commit: r619823 [6/19] - in /incubator/qpid/branches/thegreatmerge/qpid:
./ cpp/ dotnet/ dotnet/Qpid.Buffer.Tests/Properties/
dotnet/Qpid.Buffer/Properties/ dotnet/Qpid.Client.Tests/
dotnet/Qpid.Client.Tests/Channel/ dotnet/Qpid.Client.Tests/Common...
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java Fri Feb 8 02:09:37 2008
@@ -26,11 +26,7 @@
import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.ConnectionSecureBody;
-import org.apache.qpid.framing.ConnectionStartOkBody;
-import org.apache.qpid.framing.ConnectionTuneBody;
-import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -52,7 +48,7 @@
private static final int DEFAULT_FRAME_SIZE = 65536;
- public static StateAwareMethodListener<ConnectionStartOkBody> getInstance()
+ public static ConnectionStartOkMethodHandler getInstance()
{
return _instance;
}
@@ -61,51 +57,51 @@
{
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionStartOkBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, ConnectionStartOkBody body, int channelId) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
- final ConnectionStartOkBody body = evt.getMethod();
- _logger.info("SASL Mechanism selected: " + body.mechanism);
- _logger.info("Locale selected: " + body.locale);
+
+ _logger.info("SASL Mechanism selected: " + body.getMechanism());
+ _logger.info("Locale selected: " + body.getLocale());
AuthenticationManager authMgr = ApplicationRegistry.getInstance().getAuthenticationManager();//session.getVirtualHost().getAuthenticationManager();
SaslServer ss = null;
try
{
- ss = authMgr.createSaslServer(String.valueOf(body.mechanism), session.getLocalFQDN());
+ ss = authMgr.createSaslServer(String.valueOf(body.getMechanism()), session.getLocalFQDN());
if (ss == null)
{
- throw body.getConnectionException(AMQConstant.RESOURCE_ERROR, "Unable to create SASL Server:" + body.mechanism
+ throw body.getConnectionException(AMQConstant.RESOURCE_ERROR, "Unable to create SASL Server:" + body.getMechanism()
);
}
session.setSaslServer(ss);
- AuthenticationResult authResult = authMgr.authenticate(ss, body.response);
+ AuthenticationResult authResult = authMgr.authenticate(ss, body.getResponse());
//save clientProperties
if (session.getClientProperties() == null)
{
- session.setClientProperties(body.clientProperties);
+ session.setClientProperties(body.getClientProperties());
}
+ MethodRegistry methodRegistry = session.getMethodRegistry();
+
switch (authResult.status)
{
case ERROR:
_logger.info("Authentication failed");
stateManager.changeState(AMQState.CONNECTION_CLOSING);
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame close = ConnectionCloseBody.createAMQFrame(0,
- (byte) 8, (byte) 0, // AMQP version (major, minor)
- ConnectionCloseBody.getClazz((byte) 8, (byte) 0), // classId
- ConnectionCloseBody.getMethod((byte) 8, (byte) 0), // methodId
- AMQConstant.NOT_ALLOWED.getCode(), // replyCode
- AMQConstant.NOT_ALLOWED.getName()); // replyText
- session.writeFrame(close);
+
+ ConnectionCloseBody closeBody =
+ methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), // replyCode
+ AMQConstant.NOT_ALLOWED.getName(),
+ body.getClazz(),
+ body.getMethod());
+
+ session.writeFrame(closeBody.generateFrame(0));
disposeSaslServer(session);
break;
@@ -114,25 +110,17 @@
session.setAuthorizedID(new UsernamePrincipal(ss.getAuthorizationID()));
stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame tune = ConnectionTuneBody.createAMQFrame(0,
- (byte) 8, (byte) 0, // AMQP version (major, minor)
- Integer.MAX_VALUE, // channelMax
- getConfiguredFrameSize(), // frameMax
- HeartbeatConfig.getInstance().getDelay()); // heartbeat
- session.writeFrame(tune);
+
+ ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody(0xFFFF,
+ getConfiguredFrameSize(),
+ HeartbeatConfig.getInstance().getDelay());
+ session.writeFrame(tuneBody.generateFrame(0));
break;
case CONTINUE:
stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0,
- (byte) 8, (byte) 0, // AMQP version (major, minor)
- authResult.challenge); // challenge
- session.writeFrame(challenge);
+
+ ConnectionSecureBody secureBody = methodRegistry.createConnectionSecureBody(authResult.challenge);
+ session.writeFrame(secureBody.generateFrame(0));
}
}
catch (SaslException e)
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java Fri Feb 8 02:09:37 2008
@@ -40,15 +40,15 @@
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionTuneOkBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, ConnectionTuneOkBody body, int channelId) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
- ConnectionTuneOkBody body = evt.getMethod();
+
if (_logger.isDebugEnabled())
{
_logger.debug(body);
}
stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
- session.initHeartbeats(body.heartbeat);
+ session.initHeartbeats(body.getHeartbeat());
}
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java Fri Feb 8 02:09:37 2008
@@ -21,10 +21,8 @@
package org.apache.qpid.server.handler;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ExchangeBoundBody;
-import org.apache.qpid.framing.ExchangeBoundOkBody;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -36,6 +34,8 @@
/**
* @author Apache Software Foundation
+ *
+ *
*/
public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBoundBody>
{
@@ -64,35 +64,32 @@
{
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ExchangeBoundBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, ExchangeBoundBody body, int channelId) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
+
VirtualHost virtualHost = session.getVirtualHost();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+ MethodRegistry methodRegistry = session.getMethodRegistry();
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- byte major = (byte)8;
- byte minor = (byte)0;
- ExchangeBoundBody body = evt.getMethod();
- AMQShortString exchangeName = body.exchange;
- AMQShortString queueName = body.queue;
- AMQShortString routingKey = body.routingKey;
+
+ AMQShortString exchangeName = body.getExchange();
+ AMQShortString queueName = body.getQueue();
+ AMQShortString routingKey = body.getRoutingKey();
if (exchangeName == null)
{
throw new AMQException(null, "Exchange exchange must not be null", null);
}
Exchange exchange = virtualHost.getExchangeRegistry().getExchange(exchangeName);
- AMQFrame response;
+ ExchangeBoundOkBody response;
if (exchange == null)
{
- // AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
- EXCHANGE_NOT_FOUND, // replyCode
- new AMQShortString("Exchange " + exchangeName + " not found")); // replyText
+
+
+ response = methodRegistry.createExchangeBoundOkBody(EXCHANGE_NOT_FOUND,
+ new AMQShortString("Exchange " + exchangeName + " not found"));
}
else if (routingKey == null)
{
@@ -100,18 +97,12 @@
{
if (exchange.hasBindings())
{
- // AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
- OK, // replyCode
- null); // replyText
+ response = methodRegistry.createExchangeBoundOkBody(OK, null);
}
else
{
- // AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
- NO_BINDINGS, // replyCode
+
+ response = methodRegistry.createExchangeBoundOkBody(NO_BINDINGS, // replyCode
null); // replyText
}
}
@@ -121,28 +112,22 @@
AMQQueue queue = queueRegistry.getQueue(queueName);
if (queue == null)
{
- // AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
- QUEUE_NOT_FOUND, // replyCode
+
+ response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode
new AMQShortString("Queue " + queueName + " not found")); // replyText
}
else
{
if (exchange.isBound(queue))
{
- // AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
- OK, // replyCode
+
+ response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode
null); // replyText
}
else
{
- // AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
- QUEUE_NOT_BOUND, // replyCode
+
+ response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_BOUND, // replyCode
new AMQShortString("Queue " + queueName + " not bound to exchange " + exchangeName)); // replyText
}
}
@@ -153,53 +138,43 @@
AMQQueue queue = queueRegistry.getQueue(queueName);
if (queue == null)
{
- // AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
- QUEUE_NOT_FOUND, // replyCode
+
+ response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode
new AMQShortString("Queue " + queueName + " not found")); // replyText
}
else
{
- if (exchange.isBound(body.routingKey, queue))
+ if (exchange.isBound(body.getRoutingKey(), queue))
{
- // AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
- OK, // replyCode
+
+ response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode
null); // replyText
}
else
{
- // AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
- SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode
+
+ response = methodRegistry.createExchangeBoundOkBody(SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode
new AMQShortString("Queue " + queueName + " not bound with routing key " +
- body.routingKey + " to exchange " + exchangeName)); // replyText
+ body.getRoutingKey() + " to exchange " + exchangeName)); // replyText
}
}
}
else
{
- if (exchange.isBound(body.routingKey))
+ if (exchange.isBound(body.getRoutingKey()))
{
- // AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
- OK, // replyCode
+
+ response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode
null); // replyText
}
else
{
- // AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
- NO_QUEUE_BOUND_WITH_RK, // replyCode
- new AMQShortString("No queue bound with routing key " + body.routingKey +
+
+ response = methodRegistry.createExchangeBoundOkBody(NO_QUEUE_BOUND_WITH_RK, // replyCode
+ new AMQShortString("No queue bound with routing key " + body.getRoutingKey() +
" to exchange " + exchangeName)); // replyText
}
}
- session.writeFrame(response);
+ session.writeFrame(response.generateFrame(channelId));
}
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java Fri Feb 8 02:09:37 2008
@@ -24,9 +24,7 @@
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUnknownExchangeType;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.ExchangeDeclareBody;
-import org.apache.qpid.framing.ExchangeDeclareOkBody;
+import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.exchange.Exchange;
@@ -54,61 +52,60 @@
{
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ExchangeDeclareBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, ExchangeDeclareBody body, int channelId) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
- final ExchangeDeclareBody body = evt.getMethod();
+
if (_logger.isDebugEnabled())
{
- _logger.debug("Request to declare exchange of type " + body.type + " with name " + body.exchange);
+ _logger.debug("Request to declare exchange of type " + body.getType() + " with name " + body.getExchange());
}
synchronized(exchangeRegistry)
{
- Exchange exchange = exchangeRegistry.getExchange(body.exchange);
+ Exchange exchange = exchangeRegistry.getExchange(body.getExchange());
if (exchange == null)
{
- if(body.passive && ((body.type == null) || body.type.length() ==0))
+ if(body.getPassive() && ((body.getType() == null) || body.getType().length() ==0))
{
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + body.exchange);
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + body.getExchange());
}
else
{
try
{
- exchange = exchangeFactory.createExchange(body.exchange == null ? null : body.exchange.intern(),
- body.type == null ? null : body.type.intern(),
- body.durable,
- body.passive, body.ticket);
+ exchange = exchangeFactory.createExchange(body.getExchange() == null ? null : body.getExchange().intern(),
+ body.getType() == null ? null : body.getType().intern(),
+ body.getDurable(),
+ body.getPassive());
exchangeRegistry.registerExchange(exchange);
}
catch(AMQUnknownExchangeType e)
{
- throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + body.exchange,e);
+ throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + body.getExchange(),e);
}
}
}
- else if (!exchange.getType().equals(body.type))
+ else if (!exchange.getType().equals(body.getType()))
{
- throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + body.exchange + " of type " + exchange.getType() + " to " + body.type +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor(), null);
+ throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + body.getExchange() + " of type " + exchange.getType() + " to " + body.getType() +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor(), null);
}
}
- if(!body.nowait)
+ if(!body.getNowait())
{
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame response = ExchangeDeclareOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
- session.writeFrame(response);
+ MethodRegistry methodRegistry = session.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody();
+ session.writeFrame(responseBody.generateFrame(channelId));
+
}
}
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java Fri Feb 8 02:09:37 2008
@@ -45,21 +45,20 @@
{
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ExchangeDeleteBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, ExchangeDeleteBody body, int channelId) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
- ExchangeDeleteBody body = evt.getMethod();
+
try
{
- exchangeRegistry.unregisterExchange(body.exchange, body.ifUnused);
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame response = ExchangeDeleteOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
- session.writeFrame(response);
+ exchangeRegistry.unregisterExchange(body.getExchange(), body.getIfUnused());
+
+ ExchangeDeleteOkBody responseBody = session.getMethodRegistry().createExchangeDeleteOkBody();
+
+ session.writeFrame(responseBody.generateFrame(channelId));
}
catch (ExchangeInUseException e)
{
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java Fri Feb 8 02:09:37 2008
@@ -23,9 +23,7 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidRoutingKeyException;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.QueueBindBody;
-import org.apache.qpid.framing.QueueBindOkBody;
+import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.AMQChannel;
@@ -53,22 +51,24 @@
{
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueueBindBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, QueueBindBody body, int channelId) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
- final QueueBindBody body = evt.getMethod();
+
final AMQQueue queue;
- if (body.queue == null)
+ final AMQShortString routingKey;
+
+ if (body.getQueue() == null)
{
- AMQChannel channel = session.getChannel(evt.getChannelId());
+ AMQChannel channel = session.getChannel(channelId);
if (channel == null)
{
- throw body.getChannelNotFoundException(evt.getChannelId());
+ throw body.getChannelNotFoundException(channelId);
}
queue = channel.getDefaultQueue();
@@ -78,41 +78,42 @@
throw body.getChannelException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null");
}
- if (body.routingKey == null)
+ if (body.getRoutingKey() == null)
+ {
+ routingKey = queue.getName();
+ }
+ else
{
- body.routingKey = queue.getName();
+ routingKey = body.getRoutingKey().intern();
}
}
else
{
- queue = queueRegistry.getQueue(body.queue);
+ queue = queueRegistry.getQueue(body.getQueue());
+ routingKey = body.getRoutingKey() == null ? AMQShortString.EMPTY_STRING : body.getRoutingKey().intern();
}
if (queue == null)
{
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.queue + " does not exist.");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
}
- final Exchange exch = exchangeRegistry.getExchange(body.exchange);
+ final Exchange exch = exchangeRegistry.getExchange(body.getExchange());
if (exch == null)
{
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.exchange + " does not exist.");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist.");
}
- if (body.routingKey != null)
- {
- body.routingKey = body.routingKey.intern();
- }
try
{
- if (!exch.isBound(body.routingKey, body.arguments, queue))
+ if (!exch.isBound(routingKey, body.getArguments(), queue))
{
- queue.bind(body.routingKey, body.arguments, exch);
+ queue.bind(routingKey, body.getArguments(), exch);
}
}
catch (AMQInvalidRoutingKeyException rke)
{
- throw body.getChannelException(AMQConstant.INVALID_ROUTING_KEY, body.routingKey.toString());
+ throw body.getChannelException(AMQConstant.INVALID_ROUTING_KEY, routingKey.toString());
}
catch (AMQException e)
{
@@ -121,15 +122,14 @@
if (_log.isInfoEnabled())
{
- _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + body.routingKey);
+ _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey);
}
- if (!body.nowait)
+ if (!body.getNowait())
{
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- final AMQFrame response = QueueBindOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0);
- session.writeFrame(response);
+ MethodRegistry methodRegistry = session.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody();
+ session.writeFrame(responseBody.generateFrame(channelId));
+
}
}
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Fri Feb 8 02:09:37 2008
@@ -27,10 +27,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.configuration.Configured;
import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.QueueDeclareOkBody;
+import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.configuration.Configurator;
@@ -69,7 +66,7 @@
Configurator.configure(this);
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, QueueDeclareBody body, int channelId) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
@@ -77,12 +74,20 @@
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
MessageStore store = virtualHost.getMessageStore();
- QueueDeclareBody body = evt.getMethod();
+
+
+
+ final AMQShortString queueName;
// if we aren't given a queue name, we create one which we return to the client
- if (body.queue == null)
+
+ if ((body.getQueue() == null) || (body.getQueue().length() == 0))
+ {
+ queueName = createName();
+ }
+ else
{
- body.queue = createName();
+ queueName = body.getQueue().intern();
}
AMQQueue queue;
@@ -91,21 +96,17 @@
synchronized (queueRegistry)
{
- if (((queue = queueRegistry.getQueue(body.queue)) == null))
+ if (((queue = queueRegistry.getQueue(queueName)) == null))
{
- if(body.queue != null)
- {
- body.queue = body.queue.intern();
- }
- if (body.passive)
+ if (body.getPassive())
{
- String msg = "Queue: " + body.queue + " not found on VirtualHost(" + virtualHost + ").";
+ String msg = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ").";
throw body.getChannelException(AMQConstant.NOT_FOUND, msg);
}
else
{
- queue = createQueue(body, virtualHost, session);
+ queue = createQueue(queueName,body, virtualHost, session);
if (queue.isDurable() && !queue.isAutoDelete())
{
//DTX MessageStore
@@ -122,42 +123,40 @@
{
Exchange defaultExchange = exchangeRegistry.getDefaultExchange();
- queue.bind(body.queue, null, defaultExchange);
- _log.info("Queue " + body.queue + " bound to default exchange(" + defaultExchange.getName() + ")");
+ queue.bind(queueName, null, defaultExchange);
+ _log.info("Queue " + queueName + " bound to default exchange(" + defaultExchange.getName() + ")");
}
}
}
else if (queue.getOwner() != null && !session.getContextKey().equals(queue.getOwner()))
{
- throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + body.queue + "'),"
+ throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + queueName + "'),"
+ " as exclusive queue with same name "
+ "declared on another client ID('"
+ queue.getOwner() + "')");
}
- AMQChannel channel = session.getChannel(evt.getChannelId());
+ AMQChannel channel = session.getChannel(channelId);
if (channel == null)
{
- throw body.getChannelNotFoundException(evt.getChannelId());
+ throw body.getChannelNotFoundException(channelId);
}
//set this as the default queue on the channel:
channel.setDefaultQueue(queue);
}
- if (!body.nowait)
+ if (!body.getNowait())
{
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame response = QueueDeclareOkBody.createAMQFrame(evt.getChannelId(),
- (byte) 8, (byte) 0, // AMQP version (major, minor)
- queue.getConsumerCount(), // consumerCount
- queue.getMessageCount(), // messageCount
- body.queue); // queue
- _log.info("Queue " + body.queue + " declared successfully");
- session.writeFrame(response);
+ MethodRegistry methodRegistry = session.getMethodRegistry();
+ QueueDeclareOkBody responseBody =
+ methodRegistry.createQueueDeclareOkBody(queueName,
+ queue.getMessageCount(),
+ queue.getConsumerCount());
+ session.writeFrame(responseBody.generateFrame(channelId));
+
+ _log.info("Queue " + queueName + " declared successfully");
}
}
@@ -171,15 +170,18 @@
return MessageFormat.format("{0,number,0000000000000}", value);
}
- protected AMQQueue createQueue(QueueDeclareBody body, VirtualHost virtualHost, final AMQProtocolSession session)
+ protected AMQQueue createQueue(final AMQShortString queueName,
+ QueueDeclareBody body,
+ VirtualHost virtualHost,
+ final AMQProtocolSession session)
throws AMQException
{
final QueueRegistry registry = virtualHost.getQueueRegistry();
- AMQShortString owner = body.exclusive ? session.getContextKey() : null;
- final AMQQueue queue = new AMQQueue(body.queue, body.durable, owner, body.autoDelete, virtualHost);
- final AMQShortString queueName = queue.getName();
+ AMQShortString owner = body.getExclusive() ? session.getContextKey() : null;
+ final AMQQueue queue = new AMQQueue(queueName, body.getDurable(), owner, body.getAutoDelete(), virtualHost);
+
- if (body.exclusive && !body.durable)
+ if (body.getExclusive() && !body.getDurable())
{
final AMQProtocolSession.Task deleteQueueTask =
new AMQProtocolSession.Task()
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java Fri Feb 8 02:09:37 2008
@@ -23,6 +23,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.QueueDeleteBody;
import org.apache.qpid.framing.QueueDeleteOkBody;
+import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -58,22 +59,21 @@
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, QueueDeleteBody body, int channelId) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
MessageStore store = virtualHost.getMessageStore();
- QueueDeleteBody body = evt.getMethod();
AMQQueue queue;
- if (body.queue == null)
+ if (body.getQueue() == null)
{
- AMQChannel channel = session.getChannel(evt.getChannelId());
+ AMQChannel channel = session.getChannel(channelId);
if (channel == null)
{
- throw body.getChannelNotFoundException(evt.getChannelId());
+ throw body.getChannelNotFoundException(channelId);
}
//get the default queue on the channel:
@@ -81,31 +81,31 @@
}
else
{
- queue = queueRegistry.getQueue(body.queue);
+ queue = queueRegistry.getQueue(body.getQueue());
}
if (queue == null)
{
if (_failIfNotFound)
{
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.queue + " does not exist.");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
}
}
else
{
- if (body.ifEmpty && !queue.isEmpty())
+ if (body.getIfEmpty() && !queue.isEmpty())
{
- throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.queue + " is not empty.");
+ throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is not empty.");
}
- else if (body.ifUnused && !queue.isUnused())
+ else if (body.getIfUnused() && !queue.isUnused())
{
// TODO - Error code
- throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.queue + " is still used.");
+ throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is still used.");
}
else
{
- int purged = queue.delete(body.ifUnused, body.ifEmpty);
+ int purged = queue.delete(body.getIfUnused(), body.getIfEmpty());
if (queue.isDurable())
{
@@ -119,13 +119,10 @@
throw new AMQException(null, "problem when destroying queue " + queue, e);
}
}
-
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- session.writeFrame(QueueDeleteOkBody.createAMQFrame(evt.getChannelId(),
- (byte) 8, (byte) 0, // AMQP version (major, minor)
- purged)); // messageCount
+
+ MethodRegistry methodRegistry = session.getMethodRegistry();
+ QueueDeleteOkBody responseBody = methodRegistry.createQueueDeleteOkBody(purged);
+ session.writeFrame(responseBody.generateFrame(channelId));
}
}
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java Fri Feb 8 02:09:37 2008
@@ -24,6 +24,8 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.QueuePurgeBody;
import org.apache.qpid.framing.QueuePurgeOkBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -55,22 +57,22 @@
_failIfNotFound = failIfNotFound;
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueuePurgeBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, QueuePurgeBody body, int channelId) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
- AMQChannel channel = session.getChannel(evt.getChannelId());
+ AMQChannel channel = session.getChannel(channelId);
+
- QueuePurgeBody body = evt.getMethod();
AMQQueue queue;
- if(body.queue == null)
+ if(body.getQueue() == null)
{
if (channel == null)
{
- throw body.getChannelNotFoundException(evt.getChannelId());
+ throw body.getChannelNotFoundException(channelId);
}
//get the default queue on the channel:
@@ -86,14 +88,14 @@
}
else
{
- queue = queueRegistry.getQueue(body.queue);
+ queue = queueRegistry.getQueue(body.getQueue());
}
if(queue == null)
{
if(_failIfNotFound)
{
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.queue + " does not exist.");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
}
}
else
@@ -101,14 +103,13 @@
long purged = queue.clearQueue(channel.getStoreContext());
- if(!body.nowait)
+ if(!body.getNowait())
{
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- session.writeFrame(QueuePurgeOkBody.createAMQFrame(evt.getChannelId(),
- (byte)8, (byte)0, // AMQP version (major, minor)
- purged)); // messageCount
+
+ MethodRegistry methodRegistry = session.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createQueuePurgeOkBody(purged);
+ session.writeFrame(responseBody.generateFrame(channelId));
+
}
}
}
Added: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java?rev=619823&view=auto
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java (added)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java Fri Feb 8 02:09:37 2008
@@ -0,0 +1,110 @@
+package org.apache.qpid.server.handler;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidRoutingKeyException;
+import org.apache.qpid.protocol.AMQConstant;
+
+public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindBody>
+{
+ private static final Logger _log = Logger.getLogger(QueueUnbindHandler.class);
+
+ private static final QueueUnbindHandler _instance = new QueueUnbindHandler();
+
+ public static QueueUnbindHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private QueueUnbindHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, QueueUnbindBody body, int channelId) throws AMQException
+ {
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
+ ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
+
+ final AMQQueue queue;
+ final AMQShortString routingKey;
+
+ if (body.getQueue() == null)
+ {
+ AMQChannel channel = session.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
+
+ queue = channel.getDefaultQueue();
+
+ if (queue == null)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null");
+ }
+
+ routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern();
+
+ }
+ else
+ {
+ queue = queueRegistry.getQueue(body.getQueue());
+ routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern();
+ }
+
+ if (queue == null)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
+ }
+ final Exchange exch = exchangeRegistry.getExchange(body.getExchange());
+ if (exch == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist.");
+ }
+
+
+ try
+ {
+ queue.unBind(routingKey, body.getArguments(), exch);
+ }
+ catch (AMQInvalidRoutingKeyException rke)
+ {
+ throw body.getChannelException(AMQConstant.INVALID_ROUTING_KEY, routingKey.toString());
+ }
+ catch (AMQException e)
+ {
+ if(e.getErrorCode() == AMQConstant.NOT_FOUND)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_FOUND,e.getMessage(),e);
+ }
+ throw body.getChannelException(AMQConstant.CHANNEL_ERROR, e.toString());
+ }
+
+ if (_log.isInfoEnabled())
+ {
+ _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey);
+ }
+
+ MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9) session.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createQueueUnbindOkBody();
+ session.writeFrame(responseBody.generateFrame(channelId));
+
+
+ }
+}
Added: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl.java?rev=619823&view=auto
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl.java (added)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl.java Fri Feb 8 02:09:37 2008
@@ -0,0 +1,566 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import java.util.Map;
+import java.util.HashMap;
+
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.AMQException;
+
+public class ServerMethodDispatcherImpl implements MethodDispatcher
+{
+ private final AMQStateManager _stateManager;
+
+ private static interface DispatcherFactory
+ {
+ public MethodDispatcher createMethodDispatcher(AMQStateManager stateManager);
+ }
+
+ private static final Map<ProtocolVersion, DispatcherFactory> _dispatcherFactories =
+ new HashMap<ProtocolVersion, DispatcherFactory>();
+
+
+ static
+ {
+ _dispatcherFactories.put(ProtocolVersion.v8_0,
+ new DispatcherFactory()
+ {
+ public MethodDispatcher createMethodDispatcher(AMQStateManager stateManager)
+ {
+ return new ServerMethodDispatcherImpl_8_0(stateManager);
+ }
+ });
+
+ _dispatcherFactories.put(ProtocolVersion.v0_9,
+ new DispatcherFactory()
+ {
+ public MethodDispatcher createMethodDispatcher(AMQStateManager stateManager)
+ {
+ return new ServerMethodDispatcherImpl_0_9(stateManager);
+ }
+ });
+
+ }
+
+
+ private static final AccessRequestHandler _accessRequestHandler = AccessRequestHandler.getInstance();
+ private static final ChannelCloseHandler _channelCloseHandler = ChannelCloseHandler.getInstance();
+ private static final ChannelOpenHandler _channelOpenHandler = ChannelOpenHandler.getInstance();
+ private static final ChannelCloseOkHandler _channelCloseOkHandler = ChannelCloseOkHandler.getInstance();
+ private static final ConnectionCloseMethodHandler _connectionCloseMethodHandler = ConnectionCloseMethodHandler.getInstance();
+ private static final ConnectionCloseOkMethodHandler _connectionCloseOkMethodHandler = ConnectionCloseOkMethodHandler.getInstance();
+ private static final ConnectionOpenMethodHandler _connectionOpenMethodHandler = ConnectionOpenMethodHandler.getInstance();
+ private static final ConnectionTuneOkMethodHandler _connectionTuneOkMethodHandler = ConnectionTuneOkMethodHandler.getInstance();
+ private static final ConnectionSecureOkMethodHandler _connectionSecureOkMethodHandler = ConnectionSecureOkMethodHandler.getInstance();
+ private static final ConnectionStartOkMethodHandler _connectionStartOkMethodHandler = ConnectionStartOkMethodHandler.getInstance();
+ private static final ExchangeDeclareHandler _exchangeDeclareHandler = ExchangeDeclareHandler.getInstance();
+ private static final ExchangeDeleteHandler _exchangeDeleteHandler = ExchangeDeleteHandler.getInstance();
+ private static final ExchangeBoundHandler _exchangeBoundHandler = ExchangeBoundHandler.getInstance();
+ private static final BasicAckMethodHandler _basicAckMethodHandler = BasicAckMethodHandler.getInstance();
+ private static final BasicRecoverMethodHandler _basicRecoverMethodHandler = BasicRecoverMethodHandler.getInstance();
+ private static final BasicConsumeMethodHandler _basicConsumeMethodHandler = BasicConsumeMethodHandler.getInstance();
+ private static final BasicGetMethodHandler _basicGetMethodHandler = BasicGetMethodHandler.getInstance();
+ private static final BasicCancelMethodHandler _basicCancelMethodHandler = BasicCancelMethodHandler.getInstance();
+ private static final BasicPublishMethodHandler _basicPublishMethodHandler = BasicPublishMethodHandler.getInstance();
+ private static final BasicQosHandler _basicQosHandler = BasicQosHandler.getInstance();
+ private static final QueueBindHandler _queueBindHandler = QueueBindHandler.getInstance();
+ private static final QueueDeclareHandler _queueDeclareHandler = QueueDeclareHandler.getInstance();
+ private static final QueueDeleteHandler _queueDeleteHandler = QueueDeleteHandler.getInstance();
+ private static final QueuePurgeHandler _queuePurgeHandler = QueuePurgeHandler.getInstance();
+ private static final ChannelFlowHandler _channelFlowHandler = ChannelFlowHandler.getInstance();
+ private static final TxSelectHandler _txSelectHandler = TxSelectHandler.getInstance();
+ private static final TxCommitHandler _txCommitHandler = TxCommitHandler.getInstance();
+ private static final TxRollbackHandler _txRollbackHandler = TxRollbackHandler.getInstance();
+ private static final BasicRejectMethodHandler _basicRejectMethodHandler = BasicRejectMethodHandler.getInstance();
+
+
+
+ public static MethodDispatcher createMethodDispatcher(AMQStateManager stateManager, ProtocolVersion protocolVersion)
+ {
+ return _dispatcherFactories.get(protocolVersion).createMethodDispatcher(stateManager);
+ }
+
+
+ public ServerMethodDispatcherImpl(AMQStateManager stateManager)
+ {
+ _stateManager = stateManager;
+ }
+
+
+ protected AMQStateManager getStateManager()
+ {
+ return _stateManager;
+ }
+
+
+
+ public boolean dispatchAccessRequest(AccessRequestBody body, int channelId) throws AMQException
+ {
+ _accessRequestHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchBasicAck(BasicAckBody body, int channelId) throws AMQException
+ {
+ _basicAckMethodHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchBasicCancel(BasicCancelBody body, int channelId) throws AMQException
+ {
+ _basicCancelMethodHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchBasicConsume(BasicConsumeBody body, int channelId) throws AMQException
+ {
+ _basicConsumeMethodHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchBasicGet(BasicGetBody body, int channelId) throws AMQException
+ {
+ _basicGetMethodHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchBasicPublish(BasicPublishBody body, int channelId) throws AMQException
+ {
+ _basicPublishMethodHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchBasicQos(BasicQosBody body, int channelId) throws AMQException
+ {
+ _basicQosHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchBasicRecover(BasicRecoverBody body, int channelId) throws AMQException
+ {
+ _basicRecoverMethodHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchBasicReject(BasicRejectBody body, int channelId) throws AMQException
+ {
+ _basicRejectMethodHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchChannelOpen(ChannelOpenBody body, int channelId) throws AMQException
+ {
+ _channelOpenHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+
+ public boolean dispatchAccessRequestOk(AccessRequestOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchBasicCancelOk(BasicCancelOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchBasicConsumeOk(BasicConsumeOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchBasicDeliver(BasicDeliverBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchBasicGetEmpty(BasicGetEmptyBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchBasicGetOk(BasicGetOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchBasicQosOk(BasicQosOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchBasicReturn(BasicReturnBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchChannelClose(ChannelCloseBody body, int channelId) throws AMQException
+ {
+ _channelCloseHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+
+ public boolean dispatchChannelCloseOk(ChannelCloseOkBody body, int channelId) throws AMQException
+ {
+ _channelCloseOkHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+
+ public boolean dispatchChannelFlow(ChannelFlowBody body, int channelId) throws AMQException
+ {
+ _channelFlowHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchChannelFlowOk(ChannelFlowOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchChannelOpenOk(ChannelOpenOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+
+ public boolean dispatchConnectionOpen(ConnectionOpenBody body, int channelId) throws AMQException
+ {
+ _connectionOpenMethodHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+
+ public boolean dispatchConnectionClose(ConnectionCloseBody body, int channelId) throws AMQException
+ {
+ _connectionCloseMethodHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+
+ public boolean dispatchConnectionCloseOk(ConnectionCloseOkBody body, int channelId) throws AMQException
+ {
+ _connectionCloseOkMethodHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchConnectionOpenOk(ConnectionOpenOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchConnectionRedirect(ConnectionRedirectBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchConnectionSecure(ConnectionSecureBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchConnectionStart(ConnectionStartBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchConnectionTune(ConnectionTuneBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchDtxSelectOk(DtxSelectOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchDtxStartOk(DtxStartOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchExchangeBoundOk(ExchangeBoundOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchExchangeDeclareOk(ExchangeDeclareOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchExchangeDeleteOk(ExchangeDeleteOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchFileCancelOk(FileCancelOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchFileConsumeOk(FileConsumeOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchFileDeliver(FileDeliverBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchFileOpen(FileOpenBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchFileOpenOk(FileOpenOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchFileQosOk(FileQosOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchFileReturn(FileReturnBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchFileStage(FileStageBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchQueueBindOk(QueueBindOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchQueueDeclareOk(QueueDeclareOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchQueueDeleteOk(QueueDeleteOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchQueuePurgeOk(QueuePurgeOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchStreamCancelOk(StreamCancelOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchStreamConsumeOk(StreamConsumeOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchStreamDeliver(StreamDeliverBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchStreamQosOk(StreamQosOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchStreamReturn(StreamReturnBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchTxCommitOk(TxCommitOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchTxRollbackOk(TxRollbackOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchTxSelectOk(TxSelectOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+
+ public boolean dispatchConnectionSecureOk(ConnectionSecureOkBody body, int channelId) throws AMQException
+ {
+ _connectionSecureOkMethodHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchConnectionStartOk(ConnectionStartOkBody body, int channelId) throws AMQException
+ {
+ _connectionStartOkMethodHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchConnectionTuneOk(ConnectionTuneOkBody body, int channelId) throws AMQException
+ {
+ _connectionTuneOkMethodHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchDtxSelect(DtxSelectBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchDtxStart(DtxStartBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchExchangeBound(ExchangeBoundBody body, int channelId) throws AMQException
+ {
+ _exchangeBoundHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchExchangeDeclare(ExchangeDeclareBody body, int channelId) throws AMQException
+ {
+ _exchangeDeclareHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchExchangeDelete(ExchangeDeleteBody body, int channelId) throws AMQException
+ {
+ _exchangeDeleteHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchFileAck(FileAckBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchFileCancel(FileCancelBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchFileConsume(FileConsumeBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchFilePublish(FilePublishBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchFileQos(FileQosBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchFileReject(FileRejectBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchQueueBind(QueueBindBody body, int channelId) throws AMQException
+ {
+ _queueBindHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchQueueDeclare(QueueDeclareBody body, int channelId) throws AMQException
+ {
+ _queueDeclareHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchQueueDelete(QueueDeleteBody body, int channelId) throws AMQException
+ {
+ _queueDeleteHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchQueuePurge(QueuePurgeBody body, int channelId) throws AMQException
+ {
+ _queuePurgeHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchStreamCancel(StreamCancelBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchStreamConsume(StreamConsumeBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchStreamPublish(StreamPublishBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchStreamQos(StreamQosBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTunnelRequest(TunnelRequestBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTxCommit(TxCommitBody body, int channelId) throws AMQException
+ {
+ _txCommitHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchTxRollback(TxRollbackBody body, int channelId) throws AMQException
+ {
+ _txRollbackHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+ public boolean dispatchTxSelect(TxSelectBody body, int channelId) throws AMQException
+ {
+ _txSelectHandler.methodReceived(_stateManager, body, channelId);
+ return true;
+ }
+
+
+
+
+}
Added: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_9.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_9.java?rev=619823&view=auto
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_9.java (added)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_9.java Fri Feb 8 02:09:37 2008
@@ -0,0 +1,164 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+
+import org.apache.qpid.framing.amqp_0_9.MethodDispatcher_0_9;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.AMQException;
+
+
+
+public class ServerMethodDispatcherImpl_0_9
+ extends ServerMethodDispatcherImpl
+ implements MethodDispatcher_0_9
+
+{
+
+ private static final BasicRecoverSyncMethodHandler _basicRecoverSyncMethodHandler =
+ BasicRecoverSyncMethodHandler.getInstance();
+ private static final QueueUnbindHandler _queueUnbindHandler =
+ QueueUnbindHandler.getInstance();
+
+
+ public ServerMethodDispatcherImpl_0_9(AMQStateManager stateManager)
+ {
+ super(stateManager);
+ }
+
+ public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException
+ {
+ _basicRecoverSyncMethodHandler.methodReceived(getStateManager(), body, channelId);
+ return true;
+ }
+
+ public boolean dispatchBasicRecoverSyncOk(BasicRecoverSyncOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchChannelOk(ChannelOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchChannelPing(ChannelPingBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchChannelPong(ChannelPongBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchChannelResume(ChannelResumeBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageAppend(MessageAppendBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageCancel(MessageCancelBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageCheckpoint(MessageCheckpointBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageClose(MessageCloseBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageConsume(MessageConsumeBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageEmpty(MessageEmptyBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageGet(MessageGetBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageOffset(MessageOffsetBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageOk(MessageOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageOpen(MessageOpenBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageQos(MessageQosBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageRecover(MessageRecoverBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageReject(MessageRejectBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageResume(MessageResumeBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageTransfer(MessageTransferBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException
+ {
+ _queueUnbindHandler.methodReceived(getStateManager(),body,channelId);
+ return true;
+ }
+}
Added: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_8_0.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_8_0.java?rev=619823&view=auto
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_8_0.java (added)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_8_0.java Fri Feb 8 02:09:37 2008
@@ -0,0 +1,86 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.framing.amqp_8_0.MethodDispatcher_8_0;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.AMQException;
+
+public class ServerMethodDispatcherImpl_8_0
+ extends ServerMethodDispatcherImpl
+ implements MethodDispatcher_8_0
+{
+ public ServerMethodDispatcherImpl_8_0(AMQStateManager stateManager)
+ {
+ super(stateManager);
+ }
+
+ public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchChannelAlert(ChannelAlertBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchTestContent(TestContentBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTestContentOk(TestContentOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTestInteger(TestIntegerBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTestIntegerOk(TestIntegerOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTestString(TestStringBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTestStringOk(TestStringOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTestTable(TestTableBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTestTableOk(TestTableOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java Fri Feb 8 02:09:37 2008
@@ -24,6 +24,8 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.TxCommitBody;
import org.apache.qpid.framing.TxCommitOkBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -45,7 +47,7 @@
{
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<TxCommitBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, TxCommitBody body, int channelId) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
@@ -53,25 +55,26 @@
{
if (_log.isDebugEnabled())
{
- _log.debug("Commit received on channel " + evt.getChannelId());
+ _log.debug("Commit received on channel " + channelId);
}
- AMQChannel channel = session.getChannel(evt.getChannelId());
+ AMQChannel channel = session.getChannel(channelId);
if (channel == null)
{
- throw evt.getMethod().getChannelNotFoundException(evt.getChannelId());
+ throw body.getChannelNotFoundException(channelId);
}
channel.commit();
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- session.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0));
+
+ MethodRegistry methodRegistry = session.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createTxCommitOkBody();
+ session.writeFrame(responseBody.generateFrame(channelId));
+
channel.processReturns(session);
}
catch (AMQException e)
{
- throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage());
+ throw body.getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage());
}
}
}