You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/03/22 14:15:14 UTC
svn commit: r521253 [7/10] - in /incubator/qpid/branches/java.multi_version:
./ gentools/src/org/apache/qpid/gentools/ gentools/templ.cpp/
gentools/templ.cpp/class/ gentools/templ.cpp/field/
gentools/templ.cpp/method/ gentools/templ.cpp/model/ gentools...
Modified: incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java Thu Mar 22 06:14:42 2007
@@ -22,6 +22,7 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ExchangeBoundBody;
import org.apache.qpid.framing.ExchangeBoundOkBody;
+import org.apache.qpid.framing.amqp_8_0.ExchangeBoundOkBodyImpl;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -74,9 +75,9 @@
ExchangeBoundBody body = evt.getMethod();
- AMQShortString exchangeName = body.exchange;
- AMQShortString queueName = body.queue;
- AMQShortString routingKey = body.routingKey;
+ AMQShortString exchangeName = body.getExchange();
+ AMQShortString queueName = body.getQueue();
+ AMQShortString routingKey = body.getRoutingKey();
if (exchangeName == null)
{
throw new AMQException("Exchange exchange must not be null");
@@ -86,8 +87,8 @@
if (exchange == null)
{
// AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
+ response = createExchangeBoundResponseFrame(evt.getChannelId(),
+ // AMQP version (major, minor)
EXCHANGE_NOT_FOUND, // replyCode
new AMQShortString("Exchange " + exchangeName + " not found")); // replyText
}
@@ -98,16 +99,16 @@
if (exchange.hasBindings())
{
// AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
+ response = createExchangeBoundResponseFrame(evt.getChannelId(),
+ // AMQP version (major, minor)
OK, // replyCode
null); // replyText
}
else
{
// AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
+ response = createExchangeBoundResponseFrame(evt.getChannelId(),
+ // AMQP version (major, minor)
NO_BINDINGS, // replyCode
null); // replyText
}
@@ -119,8 +120,8 @@
if (queue == null)
{
// AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
+ response = createExchangeBoundResponseFrame(evt.getChannelId(),
+ // AMQP version (major, minor)
QUEUE_NOT_FOUND, // replyCode
new AMQShortString("Queue " + queueName + " not found")); // replyText
}
@@ -129,16 +130,16 @@
if (exchange.isBound(queue))
{
// AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
+ response = createExchangeBoundResponseFrame(evt.getChannelId(),
+ // AMQP version (major, minor)
OK, // replyCode
null); // replyText
}
else
{
// AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
+ response = createExchangeBoundResponseFrame(evt.getChannelId(),
+ // AMQP version (major, minor)
QUEUE_NOT_BOUND, // replyCode
new AMQShortString("Queue " + queueName + " not bound to exchange " + exchangeName)); // replyText
}
@@ -151,52 +152,58 @@
if (queue == null)
{
// AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
+ response = createExchangeBoundResponseFrame(evt.getChannelId(),
+ // AMQP version (major, minor)
QUEUE_NOT_FOUND, // replyCode
new AMQShortString("Queue " + queueName + " not found")); // replyText
}
else
{
- if (exchange.isBound(body.routingKey, queue))
+ if (exchange.isBound(body.getRoutingKey(), queue))
{
// AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
+ response = createExchangeBoundResponseFrame(evt.getChannelId(),
+ // AMQP version (major, minor)
OK, // replyCode
null); // replyText
}
else
{
// AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
+ response = createExchangeBoundResponseFrame(evt.getChannelId(),
+ // AMQP version (major, minor)
SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode
new AMQShortString("Queue " + queueName + " not bound with routing key " +
- body.routingKey + " to exchange " + exchangeName)); // replyText
+ body.getRoutingKey() + " to exchange " + exchangeName)); // replyText
}
}
}
else
{
- if (exchange.isBound(body.routingKey))
+ if (exchange.isBound(body.getRoutingKey()))
{
// AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
+ response = createExchangeBoundResponseFrame(evt.getChannelId(),
+ // AMQP version (major, minor)
OK, // replyCode
null); // replyText
}
else
{
// AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
+ response = createExchangeBoundResponseFrame(evt.getChannelId(),
+ // AMQP version (major, minor)
NO_QUEUE_BOUND_WITH_RK, // replyCode
- new AMQShortString("No queue bound with routing key " + body.routingKey +
+ new AMQShortString("No queue bound with routing key " + body.getRoutingKey() +
" to exchange " + exchangeName)); // replyText
}
}
session.writeFrame(response);
+ }
+
+ private AMQFrame createExchangeBoundResponseFrame(int channelId, int replyCode, AMQShortString replyText)
+ {
+ ExchangeBoundOkBody okBody = new ExchangeBoundOkBodyImpl(replyCode,replyText);
+ return new AMQFrame(channelId,okBody);
}
}
Modified: incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java Thu Mar 22 06:14:42 2007
@@ -64,43 +64,43 @@
final ExchangeDeclareBody body = evt.getMethod();
if (_logger.isDebugEnabled())
{
- _logger.debug("Request to declare exchange of type " + body.type + " with name " + body.exchange);
+ _logger.debug("Request to declare exchange of type " + body.getType() + " with name " + body.getExchange());
}
synchronized(exchangeRegistry)
{
- Exchange exchange = exchangeRegistry.getExchange(body.exchange);
+ Exchange exchange = exchangeRegistry.getExchange(body.getExchange());
if (exchange == null)
{
- if(body.passive && ((body.type == null) || body.type.length() ==0))
+ if(body.getPassive() && ((body.getType() == null) || body.getType().length() ==0))
{
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + body.exchange);
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + body.getExchange());
}
else
{
try
{
- exchange = exchangeFactory.createExchange(body.exchange, body.type, body.durable,
- body.passive, body.ticket);
+ exchange = exchangeFactory.createExchange(body.getExchange(), body.getType(), body.getDurable(),
+ body.getPassive(), body.getTicket());
exchangeRegistry.registerExchange(exchange);
}
catch(AMQUnknownExchangeType e)
{
- throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + body.exchange,e);
+ throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + body.getExchange(),e);
}
}
}
- else if (!exchange.getType().equals(body.type))
+ else if (!exchange.getType().equals(body.getType()))
{
- throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + body.exchange + " of type " + exchange.getType() + " to " + body.type +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor());
+ throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + body.getExchange() + " of type " + exchange.getType() + " to " + body.getType() +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor());
}
}
- if(!body.nowait)
+ if(!body.getNowait())
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
Modified: incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java Thu Mar 22 06:14:42 2007
@@ -54,7 +54,7 @@
ExchangeDeleteBody body = evt.getMethod();
try
{
- exchangeRegistry.unregisterExchange(body.exchange, body.ifUnused);
+ exchangeRegistry.unregisterExchange(body.getExchange(), body.getIfUnused());
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
Modified: incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java Thu Mar 22 06:14:42 2007
@@ -26,6 +26,7 @@
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.QueueBindBody;
import org.apache.qpid.framing.QueueBindOkBody;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.exchange.Exchange;
@@ -61,8 +62,10 @@
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
final QueueBindBody body = evt.getMethod();
+
+ AMQShortString routingKey = body.getRoutingKey();
final AMQQueue queue;
- if (body.queue == null)
+ if (body.getQueue() == null)
{
AMQChannel channel = session.getChannel(evt.getChannelId());
@@ -77,33 +80,35 @@
{
throw body.getChannelException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null");
}
-
- if (body.routingKey == null)
+
+
+
+ if (routingKey == null)
{
- body.routingKey = queue.getName();
+ routingKey = queue.getName();
}
}
else
{
- queue = queueRegistry.getQueue(body.queue);
+ queue = queueRegistry.getQueue(body.getQueue());
}
if (queue == null)
{
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.queue + " does not exist.");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
}
- final Exchange exch = exchangeRegistry.getExchange(body.exchange);
+ final Exchange exch = exchangeRegistry.getExchange(body.getExchange());
if (exch == null)
{
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.exchange + " does not exist.");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist.");
}
try
{
- queue.bind(body.routingKey, body.arguments, exch);
+ queue.bind(routingKey, body.getArguments(), exch);
}
catch (AMQInvalidRoutingKeyException rke)
{
- throw body.getChannelException(AMQConstant.INVALID_ROUTING_KEY, body.routingKey.toString());
+ throw body.getChannelException(AMQConstant.INVALID_ROUTING_KEY, routingKey.toString());
}
catch (AMQException e)
{
@@ -112,9 +117,9 @@
if (_log.isInfoEnabled())
{
- _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + body.routingKey);
+ _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey);
}
- if (!body.nowait)
+ if (!body.getNowait())
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
Modified: incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Thu Mar 22 06:14:42 2007
@@ -78,10 +78,12 @@
QueueDeclareBody body = evt.getMethod();
+ AMQShortString queueName = body.getQueue();
+
// if we aren't given a queue name, we create one which we return to the client
- if (body.queue == null)
+ if (body.getQueue() == null)
{
- body.queue = createName();
+ queueName = createName();
}
AMQQueue queue;
@@ -90,11 +92,11 @@
synchronized (queueRegistry)
{
- if (((queue = queueRegistry.getQueue(body.queue)) == null) )
+ if (((queue = queueRegistry.getQueue(queueName)) == null) )
{
- if(body.passive)
+ if(body.getPassive())
{
- String msg = "Queue: " + body.queue + " not found.";
+ String msg = "Queue: " + queueName + " not found.";
throw body.getChannelException(AMQConstant.NOT_FOUND,msg );
}
else
@@ -109,8 +111,8 @@
{
Exchange defaultExchange = exchangeRegistry.getDefaultExchange();
- queue.bind(body.queue, null, defaultExchange);
- _log.info("Queue " + body.queue + " bound to default exchange");
+ queue.bind(queueName, null, defaultExchange);
+ _log.info("Queue " + body.getQueue() + " bound to default exchange");
}
}
}
@@ -130,7 +132,7 @@
channel.setDefaultQueue(queue);
}
- if (!body.nowait)
+ if (!body.getNowait())
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
@@ -139,8 +141,8 @@
(byte)8, (byte)0, // AMQP version (major, minor)
queue.getConsumerCount(), // consumerCount
queue.getMessageCount(), // messageCount
- body.queue); // queue
- _log.info("Queue " + body.queue + " declared successfully");
+ queueName); // queue
+ _log.info("Queue " + queueName + " declared successfully");
session.writeFrame(response);
}
}
@@ -159,11 +161,11 @@
throws AMQException
{
final QueueRegistry registry = virtualHost.getQueueRegistry();
- AMQShortString owner = body.exclusive ? session.getContextKey() : null;
- final AMQQueue queue = new AMQQueue(body.queue, body.durable, owner, body.autoDelete, virtualHost);
+ AMQShortString owner = body.getExclusive() ? session.getContextKey() : null;
+ final AMQQueue queue = new AMQQueue(body.getQueue(), body.getDurable(), owner, body.getAutoDelete(), virtualHost);
final AMQShortString queueName = queue.getName();
- if(body.exclusive && !body.durable)
+ if(body.getExclusive() && !body.getDurable())
{
final AMQProtocolSession.Task deleteQueueTask =
new AMQProtocolSession.Task()
Modified: incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java Thu Mar 22 06:14:42 2007
@@ -65,7 +65,7 @@
QueueDeleteBody body = evt.getMethod();
AMQQueue queue;
- if(body.queue == null)
+ if(body.getQueue() == null)
{
AMQChannel channel = session.getChannel(evt.getChannelId());
@@ -79,31 +79,31 @@
}
else
{
- queue = queueRegistry.getQueue(body.queue);
+ queue = queueRegistry.getQueue(body.getQueue());
}
if(queue == null)
{
if(_failIfNotFound)
{
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.queue + " does not exist.");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
}
}
else
{
- if(body.ifEmpty && !queue.isEmpty())
+ if(body.getIfEmpty() && !queue.isEmpty())
{
- throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.queue + " is not empty." );
+ throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is not empty." );
}
- else if(body.ifUnused && !queue.isUnused())
+ else if(body.getIfUnused() && !queue.isUnused())
{
// TODO - Error code
- throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.queue + " is still used." );
+ throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is still used." );
}
else
{
- int purged = queue.delete(body.ifUnused, body.ifEmpty);
+ int purged = queue.delete(body.getIfUnused(), body.getIfEmpty());
store.removeQueue(queue.getName());
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
Modified: incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java Thu Mar 22 06:14:42 2007
@@ -44,7 +44,7 @@
QueuePurgeBody body = evt.getMethod();
AMQQueue queue;
- if(body.queue == null)
+ if(body.getQueue() == null)
{
if (channel == null)
@@ -65,14 +65,14 @@
}
else
{
- queue = queueRegistry.getQueue(body.queue);
+ queue = queueRegistry.getQueue(body.getQueue());
}
if(queue == null)
{
if(_failIfNotFound)
{
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.queue + " does not exist.");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
}
}
else
@@ -80,7 +80,7 @@
long purged = queue.clearQueue(channel.getStoreContext());
- if(!body.nowait)
+ if(!body.getNowait())
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
Modified: incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java Thu Mar 22 06:14:42 2007
@@ -23,6 +23,8 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.TxRollbackBody;
import org.apache.qpid.framing.TxRollbackOkBody;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.amqp_8_0.TxRollbackBodyImpl;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -56,10 +58,10 @@
}
channel.rollback();
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- session.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0));
+
+ TxRollbackBody rollbackBody = createTxRollbackBody();
+ session.writeFrame(new AMQFrame(evt.getChannelId(), rollbackBody));
+
//Now resend all the unacknowledged messages back to the original subscribers.
//(Must be done after the TxnRollback-ok response).
// Why, are we not allowed to send messages back to client before the ok method?
@@ -69,5 +71,10 @@
{
throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to rollback: " + e.getMessage());
}
+ }
+
+ private TxRollbackBody createTxRollbackBody()
+ {
+ return new TxRollbackBodyImpl();
}
}
Modified: incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java Thu Mar 22 06:14:42 2007
@@ -23,6 +23,8 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.TxSelectBody;
import org.apache.qpid.framing.TxSelectOkBody;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.amqp_8_0.TxSelectBodyImpl;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.state.AMQStateManager;
@@ -55,9 +57,12 @@
channel.setLocalTransactional();
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- session.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0));
+ TxSelectBody txSelect = createTxSelectBody();
+ session.writeFrame(new AMQFrame(evt.getChannelId(), txSelect));
+ }
+
+ private TxSelectBody createTxSelectBody()
+ {
+ return new TxSelectBodyImpl();
}
}
Copied: incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (from r511389, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=521253&p1=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java&r1=511389&p2=incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java&r2=521253
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Thu Mar 22 06:14:42 2007
@@ -44,6 +44,7 @@
import org.apache.qpid.codec.AMQDecoder;
import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_8_0.ConnectionStartBodyImpl;
import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
@@ -210,9 +211,9 @@
_logger.debug("Frame Received: " + frame);
}
- if (body instanceof AMQMethodBody)
+ if (body instanceof AMQMethodBodyImpl)
{
- methodFrameReceived(channelId, (AMQMethodBody)body);
+ methodFrameReceived(channelId, (AMQMethodBodyImpl)body);
}
else if (body instanceof ContentHeaderBody)
{
@@ -247,14 +248,13 @@
String locales = "en_US";
+ ConnectionStartBody connectionStartBody = createConnectionStartBody(pi,
+ locales.getBytes(),
+ mechanisms.getBytes(),
+ null);
+
// Interfacing with generated code - be aware of possible changes to parameter order as versions change.
- AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0,
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- locales.getBytes(), // locales
- mechanisms.getBytes(), // mechanisms
- null, // serverProperties
- (short) getProtocolMajorVersion(), // versionMajor
- (short) getProtocolMinorVersion()); // versionMinor
+ AMQFrame response = new AMQFrame((short) 0,connectionStartBody); // versionMinor
_minaProtocolSession.write(response);
}
catch (AMQException e)
@@ -272,11 +272,19 @@
}
}
+ private ConnectionStartBody createConnectionStartBody(ProtocolInitiation pi,
+ byte[] locales,
+ byte[] mechanisms,
+ FieldTable serverProperties)
+ {
+ return new ConnectionStartBodyImpl(pi._protocolMajor, pi._protocolMinor,serverProperties,mechanisms,locales);
+ }
+
- private void methodFrameReceived(int channelId, AMQMethodBody methodBody)
+ private void methodFrameReceived(int channelId, AMQMethodBodyImpl methodBody)
{
- final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId,
+ final AMQMethodEvent<AMQMethodBodyImpl> evt = new AMQMethodEvent<AMQMethodBodyImpl>(channelId,
methodBody);
//Check that this channel is not closing
@@ -736,4 +744,8 @@
}
+ public ProtocolVersion getProtocolVersion()
+ {
+ return _protocolVersion;
+ }
}
Copied: incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java (from r511389, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java?view=diff&rev=521253&p1=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java&r1=511389&p2=incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java&r2=521253
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java (original)
+++ incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java Thu Mar 22 06:14:42 2007
@@ -20,14 +20,14 @@
*/
package org.apache.qpid.server.protocol;
-import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.AMQException;
public class AMQNoMethodHandlerException extends AMQException
{
- public AMQNoMethodHandlerException(AMQMethodEvent<AMQMethodBody> evt)
+ public AMQNoMethodHandlerException(AMQMethodEvent<AMQMethodBodyImpl> evt)
{
super("AMQMethodEvent " + evt + " was not processed by any listener on Broker.");
}
Copied: incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java (from r511389, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java?view=diff&rev=521253&p1=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java&r1=511389&p2=incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java&r2=521253
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java (original)
+++ incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java Thu Mar 22 06:14:42 2007
@@ -40,6 +40,7 @@
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.amqp_8_0.ConnectionCloseBodyImpl;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.management.AMQManagedObject;
@@ -218,18 +219,11 @@
*/
public void closeConnection() throws JMException
{
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- final AMQFrame response = ConnectionCloseBody.createAMQFrame(0,
- _session.getProtocolMajorVersion(),
- _session.getProtocolMinorVersion(), // AMQP version (major, minor)
- 0, // classId
- 0, // methodId
- AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+ ConnectionCloseBody connectionCloseBody =
+ createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
BROKER_MANAGEMENT_CONSOLE_HAS_CLOSED_THE_CONNECTION // replyText
);
- _session.writeFrame(response);
+ _session.writeFrame(new AMQFrame(0,connectionCloseBody));
try
{
@@ -239,6 +233,11 @@
{
throw new MBeanException(ex, ex.toString());
}
+ }
+
+ private ConnectionCloseBody createConnectionCloseBody(int code, AMQShortString replyText)
+ {
+ return new ConnectionCloseBodyImpl(code,replyText,0,0);
}
@Override
Copied: incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (from r511389, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=521253&p1=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java&r1=511389&p2=incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java&r2=521253
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Thu Mar 22 06:14:42 2007
@@ -21,7 +21,7 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQBodyImpl;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ContentHeaderBody;
@@ -125,7 +125,7 @@
try
{
- AMQBody cb = getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(), _messageId, ++_index));
+ AMQBodyImpl cb = getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(), _messageId, ++_index));
return new AMQFrame(_channel, cb);
}
catch (AMQException e)
Modified: incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java (original)
+++ incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java Thu Mar 22 06:14:42 2007
@@ -27,35 +27,7 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.BasicAckBody;
-import org.apache.qpid.framing.BasicCancelBody;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.BasicGetBody;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.BasicQosBody;
-import org.apache.qpid.framing.BasicRecoverBody;
-import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.framing.ChannelFlowBody;
-import org.apache.qpid.framing.ChannelOpenBody;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.ConnectionCloseOkBody;
-import org.apache.qpid.framing.ConnectionOpenBody;
-import org.apache.qpid.framing.ConnectionSecureOkBody;
-import org.apache.qpid.framing.ConnectionStartOkBody;
-import org.apache.qpid.framing.ConnectionTuneOkBody;
-import org.apache.qpid.framing.ExchangeBoundBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
-import org.apache.qpid.framing.ExchangeDeleteBody;
-import org.apache.qpid.framing.QueueBindBody;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.QueueDeleteBody;
-import org.apache.qpid.framing.QueuePurgeBody;
-import org.apache.qpid.framing.TxCommitBody;
-import org.apache.qpid.framing.TxRollbackBody;
-import org.apache.qpid.framing.TxSelectBody;
-import org.apache.qpid.framing.BasicRejectBody;
+import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.server.handler.BasicAckMethodHandler;
Modified: incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java (original)
+++ incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java Thu Mar 22 06:14:42 2007
@@ -21,6 +21,7 @@
package org.apache.qpid.server.state;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.protocol.AMQMethodEvent;
Modified: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Thu Mar 22 06:14:42 2007
@@ -25,7 +25,8 @@
import org.apache.qpid.AMQUndeliveredException;
import org.apache.qpid.AMQUnresolvedAddressException;
import org.apache.qpid.client.failover.FailoverSupport;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.protocol.AMQProtocolHandlerImpl;
+import org.apache.qpid.client.protocol.ProtocolOutputHandler;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.exchange.ExchangeDefaults;
@@ -36,6 +37,8 @@
import org.apache.qpid.framing.ChannelOpenOkBody;
import org.apache.qpid.framing.TxSelectBody;
import org.apache.qpid.framing.TxSelectOkBody;
+import org.apache.qpid.framing.AMQMethodFactory;
+import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ChannelLimitReachedException;
import org.apache.qpid.jms.Connection;
@@ -92,7 +95,7 @@
* the handler deals with this. It also deals with the initial dispatch of any protocol frames to their appropriate
* handler.
*/
- private AMQProtocolHandler _protocolHandler;
+ private AMQProtocolHandlerImpl _protocolHandler;
/** Maps from session id (Integer) to AMQSession instance */
private final Map _sessions = new LinkedHashMap(); //fixme this is map is replicated in amqprotocolsession as _channelId2SessionMap
@@ -273,7 +276,7 @@
_failoverPolicy = new FailoverPolicy(connectionURL);
- _protocolHandler = new AMQProtocolHandler(this);
+ _protocolHandler = new AMQProtocolHandlerImpl(this);
// We are not currently connected
_connected = false;
@@ -550,26 +553,15 @@
private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
throws AMQException
{
+ // define this here, should be poassed in
+ final int prefetchSize = 0;
- // TODO: Be aware of possible changes to parameter order as versions change.
+ AMQMethodFactory methodFactory = getAMQMethodFactory();
- _protocolHandler.syncWrite(
- ChannelOpenBody.createAMQFrame(channelId,
- _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion(),
- null), // outOfBand
- ChannelOpenOkBody.class);
-
- //todo send low water mark when protocol allows.
- //todo Be aware of possible changes to parameter order as versions change.
- _protocolHandler.syncWrite(
- BasicQosBody.createAMQFrame(channelId,
- _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion(),
- false, // global
- prefetchHigh, // prefetchCount
- 0), // prefetchSize
- BasicQosOkBody.class);
+ ChannelOpenBody openBody = methodFactory.createChannelOpen();
+ sendCommandReceiveResponse(channelId, openBody);
+ AMQMethodBody qosBody = methodFactory.createMessageQos(prefetchHigh, prefetchSize);
+ sendCommandReceiveResponse(channelId, qosBody);
if (transacted)
{
@@ -578,14 +570,21 @@
_logger.debug("Issuing TxSelect for " + channelId);
}
- // TODO: Be aware of possible changes to parameter order as versions change.
- _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId,
- _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion()),
- TxSelectOkBody.class);
+ TxSelectBody txSelectBody = methodFactory.createTxSelect();
+ sendCommandReceiveResponse(channelId, txSelectBody);
}
}
+ private AMQMethodBody sendCommandReceiveResponse(int channelId, AMQMethodBody command) throws AMQException
+ {
+ return getProtocolOutputHandler().sendCommandReceiveResponse(channelId, command);
+ }
+
+ private AMQMethodFactory getAMQMethodFactory()
+ {
+ return getProtocolOutputHandler().getAMQMethodFactory();
+ }
+
private void reopenChannel(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) throws AMQException
{
try
@@ -934,7 +933,7 @@
return _virtualHost;
}
- public AMQProtocolHandler getProtocolHandler()
+ public AMQProtocolHandlerImpl getProtocolHandler()
{
return _protocolHandler;
}
@@ -1217,5 +1216,10 @@
public void performConnectionTask(Runnable task)
{
_taskPool.execute(task);
+ }
+
+ public ProtocolOutputHandler getProtocolOutputHandler()
+ {
+ return _protocolHandler.getOutputHandler();
}
}
Modified: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Thu Mar 22 06:14:42 2007
@@ -59,6 +59,7 @@
import org.apache.qpid.AMQUndeliveredException;
import org.apache.qpid.AMQInvalidRoutingKeyException;
import org.apache.qpid.AMQInvalidArgumentException;
+import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.client.failover.FailoverSupport;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.JMSBytesMessage;
@@ -68,41 +69,12 @@
import org.apache.qpid.client.message.JMSTextMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.protocol.AMQProtocolHandlerImpl;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.client.protocol.BlockingMethodFrameListener;
+import org.apache.qpid.client.protocol.ProtocolOutputHandler;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.AccessRequestBody;
-import org.apache.qpid.framing.AccessRequestOkBody;
-import org.apache.qpid.framing.BasicAckBody;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.BasicConsumeOkBody;
-import org.apache.qpid.framing.BasicRecoverBody;
-import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.framing.ChannelFlowBody;
-import org.apache.qpid.framing.ExchangeBoundBody;
-import org.apache.qpid.framing.ExchangeBoundOkBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
-import org.apache.qpid.framing.ExchangeDeclareOkBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.FieldTableFactory;
-import org.apache.qpid.framing.QueueBindBody;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.QueueDeleteBody;
-import org.apache.qpid.framing.QueueDeleteOkBody;
-import org.apache.qpid.framing.TxCommitBody;
-import org.apache.qpid.framing.TxCommitOkBody;
-import org.apache.qpid.framing.TxRollbackBody;
-import org.apache.qpid.framing.TxRollbackOkBody;
-import org.apache.qpid.framing.QueueBindOkBody;
-import org.apache.qpid.framing.QueueDeclareOkBody;
-import org.apache.qpid.framing.ChannelFlowOkBody;
-import org.apache.qpid.framing.BasicRecoverOkBody;
-import org.apache.qpid.framing.BasicRejectBody;
+import org.apache.qpid.framing.*;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -197,6 +169,7 @@
private boolean _suspended;
private final Object _suspensionLock = new Object();
+ private static final AMQShortString CHANNEL_CLOSE_REPLY_TEXT = new AMQShortString("JMS client closing channel");
/** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
@@ -271,11 +244,11 @@
{
if (message.getDeliverBody() != null)
{
- final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.getDeliverBody().consumerTag);
+ final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.getDeliverBody().getConsumerTag());
if (consumer == null)
{
- _logger.warn("Received a message from queue " + message.getDeliverBody().consumerTag + " without a handler - ignoring...");
+ _logger.warn("Received a message from queue " + message.getDeliverBody().getConsumerTag() + " without a handler - ignoring...");
_logger.warn("Consumers that exist: " + _consumers);
_logger.warn("Session hashcode: " + System.identityHashCode(this));
}
@@ -513,14 +486,11 @@
i.next().acknowledgeLastDelivered();
}
- // Commits outstanding messages sent and outstanding acknowledgements.
- // TODO: Be aware of possible changes to parameter order as versions change.
- final AMQProtocolHandler handler = getProtocolHandler();
-
- handler.syncWrite(TxCommitBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(),
- getProtocolMinorVersion()),
- TxCommitOkBody.class);
+ TxCommitBody commitBody = getAMQMethodFactory().createTxCommit();
+ sendCommandReceiveResponse(commitBody);
+
+
+
}
catch (AMQException e)
{
@@ -549,8 +519,8 @@
suspendChannel(true);
}
- _connection.getProtocolHandler().syncWrite(
- TxRollbackBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class);
+ TxRollbackBody rollbackBody = getAMQMethodFactory().createTxRollback();
+ sendCommandReceiveResponse(rollbackBody);
if (_dispatcher != null)
{
@@ -590,15 +560,12 @@
{
getProtocolHandler().closeSession(this);
- // TODO: Be aware of possible changes to parameter order as versions change.
- final AMQFrame frame = ChannelCloseBody.createAMQFrame(getChannelId(),
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- 0, // classId
- 0, // methodId
- AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
- new AMQShortString("JMS client closing channel")); // replyText
+ ChannelCloseBody closeBody = getAMQMethodFactory().createChannelClose(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+ CHANNEL_CLOSE_REPLY_TEXT);
+ sendCommandReceiveResponse(closeBody, timeout);
+
+
- getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout);
// When control resumes at this point, a reply will have been received that
// indicates the broker has closed the channel successfully
@@ -617,21 +584,17 @@
}
}
- private AMQProtocolHandler getProtocolHandler()
+ private AMQProtocolHandlerImpl getProtocolHandler()
{
return _connection.getProtocolHandler();
}
-
- private byte getProtocolMinorVersion()
+ public ProtocolOutputHandler getProtocolOutputHandler()
{
- return getProtocolHandler().getProtocolMinorVersion();
+ return _connection.getProtocolOutputHandler();
}
- private byte getProtocolMajorVersion()
- {
- return getProtocolHandler().getProtocolMajorVersion();
- }
+
/**
@@ -835,14 +798,12 @@
consumer.clearUnackedMessages();
}
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- _connection.getProtocolHandler().syncWrite(BasicRecoverBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(),
- getProtocolMinorVersion(),
- false) // requeue
- , BasicRecoverOkBody.class);
+ final boolean requeue = false;
+ AMQMethodBody recoverBody = getAMQMethodFactory().createRecover(requeue);
+ sendCommandReceiveResponse(recoverBody);
+
+
+
if (_dispatcher != null)
{
@@ -1226,136 +1187,60 @@
public void declareExchange(AMQShortString name, AMQShortString type) throws AMQException
{
- declareExchange(name, type, getProtocolHandler());
+ ExchangeDeclareBody exchangeDeclare = getAMQMethodFactory().createExchangeDeclare(name,type,getTicket());
+ sendCommandReceiveResponse(exchangeDeclare);
}
public void declareExchangeSynch(AMQShortString name, AMQShortString type) throws AMQException
{
- // TODO: Be aware of possible changes to parameter order as versions change.
- AMQFrame frame = ExchangeDeclareBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- null, // arguments
- false, // autoDelete
- false, // durable
- name, // exchange
- false, // internal
- false, // nowait
- false, // passive
- getTicket(), // ticket
- type); // type
- getProtocolHandler().syncWrite(frame, ExchangeDeclareOkBody.class);
- }
-
- private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException
- {
- declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler);
- }
-
- private void declareExchange(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler) throws AMQException
- {
- // TODO: Be aware of possible changes to parameter order as versions change.
- AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- null, // arguments
- false, // autoDelete
- false, // durable
- name, // exchange
- false, // internal
- false, // nowait
- false, // passive
- getTicket(), // ticket
- type); // type
+ ExchangeDeclareBody exchangeDeclare = getAMQMethodFactory().createExchangeDeclare(name,type,getTicket());
+ sendCommandReceiveResponse(exchangeDeclare);
- protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
}
-
public void createQueue(AMQShortString name, boolean autoDelete, boolean durable, boolean exclusive) throws AMQException
{
- AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- null, // arguments
- autoDelete, // autoDelete
- durable, // durable
- exclusive, // exclusive
- false, // nowait
- false, // passive
- name, // queue
- getTicket()); // ticket
-
- getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
-
+ QueueDeclareBody queueDeclare = getAMQMethodFactory().createQueueDeclare(name, null, autoDelete, durable, exclusive, false ,getTicket());
+ sendCommandReceiveResponse(queueDeclare);
}
public void bindQueue(AMQShortString queueName, AMQShortString routingKey, FieldTable arguments, AMQShortString exchangeName) throws AMQException
{
- // TODO: Be aware of possible changes to parameter order as versions change.
- AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- arguments, // arguments
- exchangeName, // exchange
- false, // nowait
- queueName, // queue
- routingKey, // routingKey
- getTicket()); // ticket
-
-
- getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class);
+ QueueBindBody queueBind = getAMQMethodFactory().createQueueBind(queueName,exchangeName,routingKey,arguments,getTicket());
+ sendCommandReceiveResponse(queueBind);
}
/**
* Declare the queue.
*
* @param amqd
- * @param protocolHandler
*
* @return the queue name. This is useful where the broker is generating a queue name on behalf of the client.
*
* @throws AMQException
*/
- private AMQShortString declareQueue(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException
+ private AMQShortString declareQueue(AMQDestination amqd) throws AMQException
{
// For queues (but not topics) we generate the name in the client rather than the
// server. This allows the name to be reused on failover if required. In general,
// the destination indicates whether it wants a name generated or not.
if (amqd.isNameRequired())
{
- amqd.setQueueName(protocolHandler.generateQueueName());
+ amqd.setQueueName(getProtocolHandler().generateQueueName());
}
//TODO verify the destiation is valid. else throw
- // TODO: Be aware of possible changes to parameter order as versions change.
- AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- null, // arguments
- amqd.isAutoDelete(), // autoDelete
- amqd.isDurable(), // durable
- amqd.isExclusive(), // exclusive
- false, // nowait
- false, // passive
- amqd.getAMQQueueName(), // queue
- getTicket()); // ticket
+ createQueue(amqd.getAMQQueueName(),amqd.isAutoDelete(),amqd.isDurable(),amqd.isExclusive());
+
- protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class);
return amqd.getAMQQueueName();
}
- private void bindQueue(AMQDestination amqd, AMQShortString queueName, AMQProtocolHandler protocolHandler, FieldTable ft) throws AMQException
+ private void bindQueue(AMQDestination amqd, AMQShortString queueName, FieldTable ft) throws AMQException
{
- // TODO: Be aware of possible changes to parameter order as versions change.
- AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- ft, // arguments
- amqd.getExchangeName(), // exchange
- false, // nowait
- queueName, // queue
- amqd.getRoutingKey(), // routingKey
- getTicket()); // ticket
-
-
- protocolHandler.syncWrite(queueBind, QueueBindOkBody.class);
+ bindQueue(queueName,amqd.getRoutingKey(),ft,amqd.getExchangeName());
}
/**
@@ -1365,8 +1250,7 @@
*
* @return the consumer tag generated by the broker
*/
- private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler,
- boolean nowait, String messageSelector) throws AMQException
+ private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName, boolean nowait, String messageSelector) throws AMQException
{
//fixme prefetch values are not used here. Do we need to have them as parametsrs?
//need to generate a consumer tag on the client so we can exploit the nowait flag
@@ -1392,25 +1276,24 @@
try
{
- // TODO: Be aware of possible changes to parameter order as versions change.
- AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- arguments, // arguments
- tag, // consumerTag
- consumer.isExclusive(), // exclusive
- consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck
- consumer.isNoLocal(), // noLocal
- nowait, // nowait
- queueName, // queue
- getTicket()); // ticket
- if (nowait)
+ final boolean noAck = consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE;
+
+ AMQMethodBody consumeBody = getAMQMethodFactory().createConsumer(tag,
+ queueName,
+ arguments,
+ noAck,
+ consumer.isExclusive(),
+ consumer.isNoLocal(),
+ getTicket());
+ if(nowait)
{
- protocolHandler.writeFrame(jmsConsume);
+ sendCommand(consumeBody);
}
else
{
- protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class);
+ sendCommandReceiveResponse(consumeBody);
}
+
}
catch (AMQException e)
{
@@ -1606,15 +1489,9 @@
{
try
{
- // TODO: Be aware of possible changes to parameter order as versions change.
- AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- false, // ifEmpty
- false, // ifUnused
- true, // nowait
- queueName, // queue
- getTicket()); // ticket
- getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class);
+
+ QueueDeleteBody deleteBody = getAMQMethodFactory().createQueueDelete(queueName, false, false, getTicket());
+ sendCommandReceiveResponse(deleteBody);
}
catch (AMQException e)
{
@@ -1697,23 +1574,23 @@
boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey) throws JMSException
{
- // TODO: Be aware of possible changes to parameter order as versions change.
- AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- exchangeName, // exchange
- queueName, // queue
- routingKey); // routingKey
AMQMethodEvent response = null;
try
{
- response = getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
+ ExchangeBoundBody exchangeBoundBody =
+ getAMQMethodFactory().createExchangeBound(exchangeName, // exchange
+ queueName, // queue
+ routingKey); // routingKey
+
+
+ ExchangeBoundOkBody responseBody = sendCommandReceiveResponse(exchangeBoundBody, ExchangeBoundOkBody.class);
+ return (responseBody.getReplyCode() == 0);
}
catch (AMQException e)
{
throw new JMSAMQException(e);
}
- ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod();
- return (responseBody.replyCode == 0); //ExchangeBoundHandler.OK); Remove Broker compile dependency
+
}
private void checkTransacted() throws JMSException
@@ -1770,13 +1647,13 @@
// Bounced message is processed here, away from the mina thread
AbstractJMSMessage bouncedMessage = _messageFactoryRegistry.createMessage(0,
false,
- message.getBounceBody().exchange,
- message.getBounceBody().routingKey,
+ message.getBounceBody().getExchange(),
+ message.getBounceBody().getRoutingKey(),
message.getContentHeader(),
message.getBodies());
- AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode);
- AMQShortString reason = message.getBounceBody().replyText;
+ AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().getReplyCode());
+ AMQShortString reason = message.getBounceBody().getReplyText();
_logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
//@TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
@@ -1812,16 +1689,12 @@
*/
public void acknowledgeMessage(long deliveryTag, boolean multiple)
{
- // TODO: Be aware of possible changes to parameter order as versions change.
- final AMQFrame ackFrame = BasicAckBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- deliveryTag, // deliveryTag
- multiple); // multiple
+ AMQMethodBody ackBody = getAMQMethodFactory().createAcknowledge(deliveryTag, multiple);
if (_logger.isDebugEnabled())
{
_logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);
}
- getProtocolHandler().writeFrame(ackFrame);
+ sendCommand(ackBody);
}
public int getDefaultPrefetch()
@@ -1908,17 +1781,15 @@
{
AMQDestination amqd = consumer.getDestination();
- AMQProtocolHandler protocolHandler = getProtocolHandler();
-
- declareExchange(amqd, protocolHandler);
+ declareExchange(amqd.getExchangeName(), amqd.getExchangeClass());
- AMQShortString queueName = declareQueue(amqd, protocolHandler);
+ AMQShortString queueName = declareQueue(amqd);
- bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
+ bindQueue(amqd, queueName, consumer.getRawSelectorFieldTable());
try
{
- consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelector());
+ consumeFromQueue(consumer, queueName, nowait, consumer.getMessageSelector());
}
catch (JMSException e) //thrown by getMessageSelector
{
@@ -2019,14 +1890,9 @@
_suspended = suspend;
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(),
- getProtocolMinorVersion(),
- !suspend); // active
+ AMQMethodBody flowBody = getAMQMethodFactory().createChannelFlow(!suspend);
+ sendCommandReceiveResponse(flowBody);
- _connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class);
}
}
@@ -2118,32 +1984,15 @@
public void requestAccess(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write, boolean read) throws AMQException
{
- getProtocolHandler().writeCommandFrameAndWaitForReply(AccessRequestBody.createAMQFrame(getChannelId(),
- getProtocolMajorVersion(),
- getProtocolMinorVersion(),
- active,
- exclusive,
- passive,
- read,
- realm,
- write),
- new BlockingMethodFrameListener(_channelId)
- {
- public boolean processMethod(int channelId, AMQMethodBody frame) throws AMQException
- {
- if (frame instanceof AccessRequestOkBody)
- {
- setTicket(((AccessRequestOkBody) frame).getTicket());
- return true;
- }
- else
- {
- return false;
- }
- }
- });
+ AccessRequestBody accessRequest = getAMQMethodFactory().createAccessRequest(active,exclusive,passive,read,realm,write);
+ AccessRequestOkBody okBody = getProtocolOutputHandler().sendCommandReceiveResponse(_channelId,accessRequest, AccessRequestOkBody.class );
+ setTicket(okBody.getTicket());
+ }
+ AMQMethodFactory getAMQMethodFactory()
+ {
+ return getProtocolOutputHandler().getAMQMethodFactory();
}
private class SuspenderRunner implements Runnable
@@ -2187,7 +2036,7 @@
{
UnprocessedMessage message = (UnprocessedMessage) messages.next();
- if (consumerTag == null || message.getDeliverBody().consumerTag.equals(consumerTag))
+ if (consumerTag == null || message.getDeliverBody().getConsumerTag().equals(consumerTag))
{
if (_logger.isTraceEnabled())
{
@@ -2196,7 +2045,7 @@
messages.remove();
- rejectMessage(message.getDeliverBody().deliveryTag, requeue);
+ rejectMessage(message.getDeliverBody().getDeliveryTag(), requeue);
_logger.debug("Rejected the message(" + message.getDeliverBody() + ") for consumer :" + consumerTag);
}
@@ -2209,13 +2058,27 @@
public void rejectMessage(long deliveryTag, boolean requeue)
{
- AMQFrame basicRejectBody = BasicRejectBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(),
- getProtocolMinorVersion(),
- deliveryTag,
- requeue);
+ AMQMethodBody rejectBody = getAMQMethodFactory().createRejectBody(deliveryTag, requeue);
+ sendCommand(rejectBody);
- _connection.getProtocolHandler().writeFrame(basicRejectBody);
}
+ <T extends AMQMethodBody> T sendCommandReceiveResponse(AMQMethodBody command, Class<T> responseClass) throws AMQException
+ {
+ return getProtocolOutputHandler().sendCommandReceiveResponse(_channelId, command, responseClass);
+ }
+ AMQMethodBody sendCommandReceiveResponse(AMQMethodBody command) throws AMQException
+ {
+ return getProtocolOutputHandler().sendCommandReceiveResponse(_channelId, command);
+ }
+ AMQMethodBody sendCommandReceiveResponse(AMQMethodBody command, long timeout) throws AMQException
+ {
+ return getProtocolOutputHandler().sendCommandReceiveResponse(_channelId, command, timeout);
+ }
+
+
+ void sendCommand(AMQMethodBody command)
+ {
+ getProtocolOutputHandler().sendCommand(_channelId, command);
+ }
}