You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/02/16 14:03:12 UTC
svn commit: r508382 -
/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Author: ritchiem
Date: Fri Feb 16 05:03:11 2007
New Revision: 508382
URL: http://svn.apache.org/viewvc?view=rev&rev=508382
Log:
QPID-373 Queue|Exchange}{Bind|Declare} should be synchronous to correctly receive/handle error
Updated AMQSession to be synchronous
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=508382&r1=508381&r2=508382
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Fri Feb 16 05:03:11 2007
@@ -58,6 +58,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidSelectorException;
import org.apache.qpid.AMQUndeliveredException;
+import org.apache.qpid.AMQInvalidRoutingKeyException;
import org.apache.qpid.client.failover.FailoverSupport;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.JMSBytesMessage;
@@ -98,6 +99,8 @@
import org.apache.qpid.framing.TxCommitOkBody;
import org.apache.qpid.framing.TxRollbackBody;
import org.apache.qpid.framing.TxRollbackOkBody;
+import org.apache.qpid.framing.QueueBindOkBody;
+import org.apache.qpid.framing.QueueDeclareOkBody;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -129,44 +132,32 @@
private AtomicBoolean _startedAtLeastOnce = new AtomicBoolean(false);
/**
- * Used to reference durable subscribers so they requests for unsubscribe can be handled
- * correctly. Note this only keeps a record of subscriptions which have been created
- * in the current instance. It does not remember subscriptions between executions of the
- * client
+ * Used to reference durable subscribers so they requests for unsubscribe can be handled correctly. Note this only
+ * keeps a record of subscriptions which have been created in the current instance. It does not remember
+ * subscriptions between executions of the client
*/
private final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions =
new ConcurrentHashMap<String, TopicSubscriberAdaptor>();
private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap =
new ConcurrentHashMap<BasicMessageConsumer, String>();
- /**
- * Used in the consume method. We generate the consume tag on the client so that we can use the nowait
- * feature.
- */
+ /** Used in the consume method. We generate the consume tag on the client so that we can use the nowait feature. */
private int _nextTag = 1;
- /**
- * This queue is bounded and is used to store messages before being dispatched to the consumer
- */
+ /** This queue is bounded and is used to store messages before being dispatched to the consumer */
private final FlowControllingBlockingQueue _queue;
private Dispatcher _dispatcher;
private MessageFactoryRegistry _messageFactoryRegistry;
- /**
- * Set of all producers created by this session
- */
+ /** Set of all producers created by this session */
private Map _producers = new ConcurrentHashMap();
- /**
- * Maps from consumer tag (String) to JMSMessageConsumer instance
- */
+ /** Maps from consumer tag (String) to JMSMessageConsumer instance */
private Map<AMQShortString, BasicMessageConsumer> _consumers = new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
- /**
- * Maps from destination to count of JMSMessageConsumers
- */
+ /** Maps from destination to count of JMSMessageConsumers */
private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount =
new ConcurrentHashMap<Destination, AtomicInteger>();
@@ -183,18 +174,17 @@
protected static final boolean DEFAULT_MANDATORY = true;
/**
- * The counter of the next producer id. This id is generated by the session and used only to allow the
- * producer to identify itself to the session when deregistering itself.
- * <p/>
- * Access to this id does not require to be synchronized since according to the JMS specification only one
- * thread of control is allowed to create producers for any given session instance.
+ * The counter of the next producer id. This id is generated by the session and used only to allow the producer to
+ * identify itself to the session when deregistering itself. <p/> Access to this id does not require to be
+ * synchronized since according to the JMS specification only one thread of control is allowed to create producers
+ * for any given session instance.
*/
private long _nextProducerId;
/**
- * Set when recover is called. This is to handle the case where recover() is called by application code
- * during onMessage() processing. We need to make sure we do not send an auto ack if recover was called.
+ * Set when recover is called. This is to handle the case where recover() is called by application code during
+ * onMessage() processing. We need to make sure we do not send an auto ack if recover was called.
*/
private boolean _inRecovery;
@@ -203,16 +193,12 @@
private boolean _hasMessageListeners;
- /**
- * Responsible for decoding a message fragment and passing it to the appropriate message consumer.
- */
+ /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
private class Dispatcher extends Thread
{
- /**
- * Track the 'stopped' state of the dispatcher, a session starts in the stopped state.
- */
+ /** Track the 'stopped' state of the dispatcher, a session starts in the stopped state. */
private final AtomicBoolean _closed = new AtomicBoolean(false);
private final Object _lock = new Object();
@@ -303,16 +289,16 @@
message.getContentHeader(),
message.getBodies());
- int errorCode = message.getBounceBody().replyCode;
+ AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode);
AMQShortString reason = message.getBounceBody().replyText;
_logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
//@TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
- if (errorCode == AMQConstant.NO_CONSUMERS.getCode())
+ if (errorCode == AMQConstant.NO_CONSUMERS)
{
_connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage));
}
- else if (errorCode == AMQConstant.NO_ROUTE.getCode())
+ else if (errorCode == (AMQConstant.NO_ROUTE)
{
_connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage));
}
@@ -649,8 +635,7 @@
}
/**
- * Called when the server initiates the closure of the session
- * unilaterally.
+ * Called when the server initiates the closure of the session unilaterally.
*
* @param e the exception that caused this session to be closed. Null causes the
*/
@@ -676,10 +661,8 @@
}
/**
- * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after
- * failover when the client has veoted resubscription.
- * <p/>
- * The caller of this method must already hold the failover mutex.
+ * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after failover
+ * when the client has veoted resubscription. <p/> The caller of this method must already hold the failover mutex.
*/
void markClosed()
{
@@ -920,7 +903,9 @@
* Creates a QueueReceiver
*
* @param destination
+ *
* @return QueueReceiver - a wrapper around our MessageConsumer
+ *
* @throws JMSException
*/
public QueueReceiver createQueueReceiver(Destination destination) throws JMSException
@@ -936,7 +921,9 @@
*
* @param destination
* @param messageSelector
+ *
* @return QueueReceiver - a wrapper around our MessageConsumer
+ *
* @throws JMSException
*/
public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException
@@ -1105,6 +1092,10 @@
ex.setLinkedException(ise);
throw ex;
}
+ catch (AMQInvalidRoutingKeyException e)
+ {
+ throw new InvalidDestinationException(amqd.getRoutingKey().toString());
+ }
catch (AMQException e)
{
JMSException ex = new JMSException("Error registering consumer: " + e);
@@ -1155,7 +1146,7 @@
}
- public void declareExchange(AMQShortString name, AMQShortString type)
+ public void declareExchange(AMQShortString name, AMQShortString type) throws AMQException
{
declareExchange(name, type, getProtocolHandler());
}
@@ -1177,12 +1168,12 @@
getProtocolHandler().syncWrite(frame, ExchangeDeclareOkBody.class);
}
- private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler)
+ private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException
{
declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler);
}
- private void declareExchange(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler)
+ private void declareExchange(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler) throws AMQException
{
// TODO: Be aware of possible changes to parameter order as versions change.
AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId,
@@ -1192,11 +1183,12 @@
false, // durable
name, // exchange
false, // internal
- true, // nowait
+ false, // nowait
false, // passive
getTicket(), // ticket
type); // type
- protocolHandler.writeFrame(exchangeDeclare);
+
+ protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
}
/**
@@ -1204,7 +1196,9 @@
*
* @param amqd
* @param protocolHandler
+ *
* @return the queue name. This is useful where the broker is generating a queue name on behalf of the client.
+ *
* @throws AMQException
*/
private AMQShortString declareQueue(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException
@@ -1217,6 +1211,8 @@
amqd.setQueueName(protocolHandler.generateQueueName());
}
+ //TODO verify the destiation is valid. else throw
+
// TODO: Be aware of possible changes to parameter order as versions change.
AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId,
getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
@@ -1224,12 +1220,12 @@
amqd.isAutoDelete(), // autoDelete
amqd.isDurable(), // durable
amqd.isExclusive(), // exclusive
- true, // nowait
+ false, // nowait
false, // passive
amqd.getAMQQueueName(), // queue
getTicket()); // ticket
- protocolHandler.writeFrame(queueDeclare);
+ protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class);
return amqd.getAMQQueueName();
}
@@ -1240,18 +1236,20 @@
getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
ft, // arguments
amqd.getExchangeName(), // exchange
- true, // nowait
+ false, // nowait
queueName, // queue
amqd.getRoutingKey(), // routingKey
getTicket()); // ticket
- protocolHandler.writeFrame(queueBind);
+
+ protocolHandler.syncWrite(queueBind, QueueBindOkBody.class);
}
/**
* Register to consume from the queue.
*
* @param queueName
+ *
* @return the consumer tag generated by the broker
*/
private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler,
@@ -1336,7 +1334,9 @@
* Creates a QueueReceiver wrapping a MessageConsumer
*
* @param queue
+ *
* @return QueueReceiver
+ *
* @throws JMSException
*/
public QueueReceiver createReceiver(Queue queue) throws JMSException
@@ -1352,7 +1352,9 @@
*
* @param queue
* @param messageSelector
+ *
* @return QueueReceiver
+ *
* @throws JMSException
*/
public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException
@@ -1399,7 +1401,9 @@
* Creates a non-durable subscriber
*
* @param topic
+ *
* @return TopicSubscriber - a wrapper round our MessageConsumer
+ *
* @throws JMSException
*/
public TopicSubscriber createSubscriber(Topic topic) throws JMSException
@@ -1416,7 +1420,9 @@
* @param topic
* @param messageSelector
* @param noLocal
+ *
* @return TopicSubscriber - a wrapper round our MessageConsumer
+ *
* @throws JMSException
*/
public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException
@@ -1493,9 +1499,7 @@
}
}
- /**
- * Note, currently this does not handle reuse of the same name with different topics correctly.
- */
+ /** Note, currently this does not handle reuse of the same name with different topics correctly. */
public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
throws JMSException
{
@@ -1606,8 +1610,8 @@
}
/**
- * Invoked by the MINA IO thread (indirectly) when a message is received from the transport.
- * Puts the message onto the queue read by the dispatcher.
+ * Invoked by the MINA IO thread (indirectly) when a message is received from the transport. Puts the message onto
+ * the queue read by the dispatcher.
*
* @param message the message that has been received
*/
@@ -1622,13 +1626,12 @@
}
/**
- * Acknowledge a message or several messages. This method can be called via AbstractJMSMessage or from
- * a BasicConsumer. The former where the mode is CLIENT_ACK and the latter where the mode is
- * AUTO_ACK or similar.
+ * Acknowledge a message or several messages. This method can be called via AbstractJMSMessage or from a
+ * BasicConsumer. The former where the mode is CLIENT_ACK and the latter where the mode is AUTO_ACK or similar.
*
* @param deliveryTag the tag of the last message to be acknowledged
- * @param multiple if true will acknowledge all messages up to and including the one specified by the
- * delivery tag
+ * @param multiple if true will acknowledge all messages up to and including the one specified by the delivery
+ * tag
*/
public void acknowledgeMessage(long deliveryTag, boolean multiple)
{
@@ -1710,7 +1713,7 @@
{
//stop the server delivering messages to this session
suspendChannel();
-
+
if (_dispatcher != null)
{
_dispatcher.setConnectionStopped(true);
@@ -1721,6 +1724,7 @@
* Callers must hold the failover mutex before calling this method.
*
* @param consumer
+ *
* @throws AMQException
*/
void registerConsumer(BasicMessageConsumer consumer, boolean nowait) throws AMQException
@@ -1746,8 +1750,8 @@
}
/**
- * Called by the MessageConsumer when closing, to deregister the consumer from the
- * map from consumerTag to consumer instance.
+ * Called by the MessageConsumer when closing, to deregister the consumer from the map from consumerTag to consumer
+ * instance.
*
* @param consumer the consum
*/
@@ -1883,7 +1887,6 @@
}
-
public int getTicket()
{
return _ticket;
@@ -1898,30 +1901,30 @@
public void requestAccess(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write, boolean read) throws AMQException
{
getProtocolHandler().writeCommandFrameAndWaitForReply(AccessRequestBody.createAMQFrame(getChannelId(),
- getProtocolMajorVersion(),
- getProtocolMinorVersion(),
- active,
- exclusive,
- passive,
- read,
- realm,
- write),
- new BlockingMethodFrameListener(_channelId)
- {
-
- public boolean processMethod(int channelId, AMQMethodBody frame) throws AMQException
- {
- if(frame instanceof AccessRequestOkBody)
- {
- setTicket(((AccessRequestOkBody)frame).getTicket());
- return true;
- }
- else
- {
- return false;
- }
- }
- });
+ getProtocolMajorVersion(),
+ getProtocolMinorVersion(),
+ active,
+ exclusive,
+ passive,
+ read,
+ realm,
+ write),
+ new BlockingMethodFrameListener(_channelId)
+ {
+
+ public boolean processMethod(int channelId, AMQMethodBody frame) throws AMQException
+ {
+ if (frame instanceof AccessRequestOkBody)
+ {
+ setTicket(((AccessRequestOkBody) frame).getTicket());
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ });
}