You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/02/16 14:03:12 UTC

svn commit: r508382 - /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java

Author: ritchiem
Date: Fri Feb 16 05:03:11 2007
New Revision: 508382

URL: http://svn.apache.org/viewvc?view=rev&rev=508382
Log:
QPID-373 Queue|Exchange}{Bind|Declare} should be synchronous to correctly receive/handle error

Updated AMQSession to be synchronous

Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=508382&r1=508381&r2=508382
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Fri Feb 16 05:03:11 2007
@@ -58,6 +58,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQInvalidSelectorException;
 import org.apache.qpid.AMQUndeliveredException;
+import org.apache.qpid.AMQInvalidRoutingKeyException;
 import org.apache.qpid.client.failover.FailoverSupport;
 import org.apache.qpid.client.message.AbstractJMSMessage;
 import org.apache.qpid.client.message.JMSBytesMessage;
@@ -98,6 +99,8 @@
 import org.apache.qpid.framing.TxCommitOkBody;
 import org.apache.qpid.framing.TxRollbackBody;
 import org.apache.qpid.framing.TxRollbackOkBody;
+import org.apache.qpid.framing.QueueBindOkBody;
+import org.apache.qpid.framing.QueueDeclareOkBody;
 import org.apache.qpid.jms.Session;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
@@ -129,44 +132,32 @@
     private AtomicBoolean _startedAtLeastOnce = new AtomicBoolean(false);
 
     /**
-     * Used to reference durable subscribers so they requests for unsubscribe can be handled
-     * correctly.  Note this only keeps a record of subscriptions which have been created
-     * in the current instance.  It does not remember subscriptions between executions of the
-     * client
+     * Used to reference durable subscribers so they requests for unsubscribe can be handled correctly.  Note this only
+     * keeps a record of subscriptions which have been created in the current instance.  It does not remember
+     * subscriptions between executions of the client
      */
     private final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions =
             new ConcurrentHashMap<String, TopicSubscriberAdaptor>();
     private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap =
             new ConcurrentHashMap<BasicMessageConsumer, String>();
 
-    /**
-     * Used in the consume method. We generate the consume tag on the client so that we can use the nowait
-     * feature.
-     */
+    /** Used in the consume method. We generate the consume tag on the client so that we can use the nowait feature. */
     private int _nextTag = 1;
 
-    /**
-     * This queue is bounded and is used to store messages before being dispatched to the consumer
-     */
+    /** This queue is bounded and is used to store messages before being dispatched to the consumer */
     private final FlowControllingBlockingQueue _queue;
 
     private Dispatcher _dispatcher;
 
     private MessageFactoryRegistry _messageFactoryRegistry;
 
-    /**
-     * Set of all producers created by this session
-     */
+    /** Set of all producers created by this session */
     private Map _producers = new ConcurrentHashMap();
 
-    /**
-     * Maps from consumer tag (String) to JMSMessageConsumer instance
-     */
+    /** Maps from consumer tag (String) to JMSMessageConsumer instance */
     private Map<AMQShortString, BasicMessageConsumer> _consumers = new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
 
-    /**
-     * Maps from destination to count of JMSMessageConsumers
-     */
+    /** Maps from destination to count of JMSMessageConsumers */
     private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount =
             new ConcurrentHashMap<Destination, AtomicInteger>();
 
@@ -183,18 +174,17 @@
     protected static final boolean DEFAULT_MANDATORY = true;
 
     /**
-     * The counter of the next producer id. This id is generated by the session and used only to allow the
-     * producer to identify itself to the session when deregistering itself.
-     * <p/>
-     * Access to this id does not require to be synchronized since according to the JMS specification only one
-     * thread of control is allowed to create producers for any given session instance.
+     * The counter of the next producer id. This id is generated by the session and used only to allow the producer to
+     * identify itself to the session when deregistering itself. <p/> Access to this id does not require to be
+     * synchronized since according to the JMS specification only one thread of control is allowed to create producers
+     * for any given session instance.
      */
     private long _nextProducerId;
 
 
     /**
-     * Set when recover is called. This is to handle the case where recover() is called by application code
-     * during onMessage() processing. We need to make sure we do not send an auto ack if recover was called.
+     * Set when recover is called. This is to handle the case where recover() is called by application code during
+     * onMessage() processing. We need to make sure we do not send an auto ack if recover was called.
      */
     private boolean _inRecovery;
 
@@ -203,16 +193,12 @@
     private boolean _hasMessageListeners;
 
 
-    /**
-     * Responsible for decoding a message fragment and passing it to the appropriate message consumer.
-     */
+    /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
 
     private class Dispatcher extends Thread
     {
 
-        /**
-         * Track the 'stopped' state of the dispatcher, a session starts in the stopped state.
-         */
+        /** Track the 'stopped' state of the dispatcher, a session starts in the stopped state. */
         private final AtomicBoolean _closed = new AtomicBoolean(false);
 
         private final Object _lock = new Object();
@@ -303,16 +289,16 @@
                                                                                               message.getContentHeader(),
                                                                                               message.getBodies());
 
-                    int errorCode = message.getBounceBody().replyCode;
+                    AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode);
                     AMQShortString reason = message.getBounceBody().replyText;
                     _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
 
                     //@TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
-                    if (errorCode == AMQConstant.NO_CONSUMERS.getCode())
+                    if (errorCode == AMQConstant.NO_CONSUMERS)
                     {
                         _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage));
                     }
-                    else if (errorCode == AMQConstant.NO_ROUTE.getCode())
+                    else if (errorCode == (AMQConstant.NO_ROUTE)
                     {
                         _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage));
                     }
@@ -649,8 +635,7 @@
     }
 
     /**
-     * Called when the server initiates the closure of the session
-     * unilaterally.
+     * Called when the server initiates the closure of the session unilaterally.
      *
      * @param e the exception that caused this session to be closed. Null causes the
      */
@@ -676,10 +661,8 @@
     }
 
     /**
-     * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after
-     * failover when the client has veoted resubscription.
-     * <p/>
-     * The caller of this method must already hold the failover mutex.
+     * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after failover
+     * when the client has veoted resubscription. <p/> The caller of this method must already hold the failover mutex.
      */
     void markClosed()
     {
@@ -920,7 +903,9 @@
      * Creates a QueueReceiver
      *
      * @param destination
+     *
      * @return QueueReceiver - a wrapper around our MessageConsumer
+     *
      * @throws JMSException
      */
     public QueueReceiver createQueueReceiver(Destination destination) throws JMSException
@@ -936,7 +921,9 @@
      *
      * @param destination
      * @param messageSelector
+     *
      * @return QueueReceiver - a wrapper around our MessageConsumer
+     *
      * @throws JMSException
      */
     public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException
@@ -1105,6 +1092,10 @@
                     ex.setLinkedException(ise);
                     throw ex;
                 }
+                catch (AMQInvalidRoutingKeyException e)
+                {
+                    throw new InvalidDestinationException(amqd.getRoutingKey().toString());
+                }
                 catch (AMQException e)
                 {
                     JMSException ex = new JMSException("Error registering consumer: " + e);
@@ -1155,7 +1146,7 @@
     }
 
 
-    public void declareExchange(AMQShortString name, AMQShortString type)
+    public void declareExchange(AMQShortString name, AMQShortString type) throws AMQException
     {
         declareExchange(name, type, getProtocolHandler());
     }
@@ -1177,12 +1168,12 @@
         getProtocolHandler().syncWrite(frame, ExchangeDeclareOkBody.class);
     }
 
-    private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler)
+    private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException
     {
         declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler);
     }
 
-    private void declareExchange(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler)
+    private void declareExchange(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler) throws AMQException
     {
         // TODO: Be aware of possible changes to parameter order as versions change.
         AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId,
@@ -1192,11 +1183,12 @@
                                                                       false,    // durable
                                                                       name,    // exchange
                                                                       false,    // internal
-                                                                      true,    // nowait
+                                                                      false,    // nowait
                                                                       false,    // passive
                                                                       getTicket(),    // ticket
                                                                       type);    // type
-        protocolHandler.writeFrame(exchangeDeclare);
+
+        protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
     }
 
     /**
@@ -1204,7 +1196,9 @@
      *
      * @param amqd
      * @param protocolHandler
+     *
      * @return the queue name. This is useful where the broker is generating a queue name on behalf of the client.
+     *
      * @throws AMQException
      */
     private AMQShortString declareQueue(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException
@@ -1217,6 +1211,8 @@
             amqd.setQueueName(protocolHandler.generateQueueName());
         }
 
+        //TODO verify the destiation is valid. else throw 
+
         // TODO: Be aware of possible changes to parameter order as versions change.
         AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId,
                                                                 getProtocolMajorVersion(), getProtocolMinorVersion(),    // AMQP version (major, minor)
@@ -1224,12 +1220,12 @@
                                                                 amqd.isAutoDelete(),    // autoDelete
                                                                 amqd.isDurable(),    // durable
                                                                 amqd.isExclusive(),    // exclusive
-                                                                true,    // nowait
+                                                                false,    // nowait
                                                                 false,    // passive
                                                                 amqd.getAMQQueueName(),    // queue
                                                                 getTicket());    // ticket
 
-        protocolHandler.writeFrame(queueDeclare);
+        protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class);
         return amqd.getAMQQueueName();
     }
 
@@ -1240,18 +1236,20 @@
                                                           getProtocolMajorVersion(), getProtocolMinorVersion(),    // AMQP version (major, minor)
                                                           ft,    // arguments
                                                           amqd.getExchangeName(),    // exchange
-                                                          true,    // nowait
+                                                          false,    // nowait
                                                           queueName,    // queue
                                                           amqd.getRoutingKey(),    // routingKey
                                                           getTicket());    // ticket
 
-        protocolHandler.writeFrame(queueBind);
+
+        protocolHandler.syncWrite(queueBind, QueueBindOkBody.class);
     }
 
     /**
      * Register to consume from the queue.
      *
      * @param queueName
+     *
      * @return the consumer tag generated by the broker
      */
     private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler,
@@ -1336,7 +1334,9 @@
      * Creates a QueueReceiver wrapping a MessageConsumer
      *
      * @param queue
+     *
      * @return QueueReceiver
+     *
      * @throws JMSException
      */
     public QueueReceiver createReceiver(Queue queue) throws JMSException
@@ -1352,7 +1352,9 @@
      *
      * @param queue
      * @param messageSelector
+     *
      * @return QueueReceiver
+     *
      * @throws JMSException
      */
     public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException
@@ -1399,7 +1401,9 @@
      * Creates a non-durable subscriber
      *
      * @param topic
+     *
      * @return TopicSubscriber - a wrapper round our MessageConsumer
+     *
      * @throws JMSException
      */
     public TopicSubscriber createSubscriber(Topic topic) throws JMSException
@@ -1416,7 +1420,9 @@
      * @param topic
      * @param messageSelector
      * @param noLocal
+     *
      * @return TopicSubscriber - a wrapper round our MessageConsumer
+     *
      * @throws JMSException
      */
     public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException
@@ -1493,9 +1499,7 @@
         }
     }
 
-    /**
-     * Note, currently this does not handle reuse of the same name with different topics correctly.
-     */
+    /** Note, currently this does not handle reuse of the same name with different topics correctly. */
     public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
             throws JMSException
     {
@@ -1606,8 +1610,8 @@
     }
 
     /**
-     * Invoked by the MINA IO thread (indirectly) when a message is received from the transport.
-     * Puts the message onto the queue read by the dispatcher.
+     * Invoked by the MINA IO thread (indirectly) when a message is received from the transport. Puts the message onto
+     * the queue read by the dispatcher.
      *
      * @param message the message that has been received
      */
@@ -1622,13 +1626,12 @@
     }
 
     /**
-     * Acknowledge a message or several messages. This method can be called via AbstractJMSMessage or from
-     * a BasicConsumer. The former where the mode is CLIENT_ACK and the latter where the mode is
-     * AUTO_ACK or similar.
+     * Acknowledge a message or several messages. This method can be called via AbstractJMSMessage or from a
+     * BasicConsumer. The former where the mode is CLIENT_ACK and the latter where the mode is AUTO_ACK or similar.
      *
      * @param deliveryTag the tag of the last message to be acknowledged
-     * @param multiple    if true will acknowledge all messages up to and including the one specified by the
-     *                    delivery tag
+     * @param multiple    if true will acknowledge all messages up to and including the one specified by the delivery
+     *                    tag
      */
     public void acknowledgeMessage(long deliveryTag, boolean multiple)
     {
@@ -1710,7 +1713,7 @@
     {
         //stop the server delivering messages to this session
         suspendChannel();
-        
+
         if (_dispatcher != null)
         {
             _dispatcher.setConnectionStopped(true);
@@ -1721,6 +1724,7 @@
      * Callers must hold the failover mutex before calling this method.
      *
      * @param consumer
+     *
      * @throws AMQException
      */
     void registerConsumer(BasicMessageConsumer consumer, boolean nowait) throws AMQException
@@ -1746,8 +1750,8 @@
     }
 
     /**
-     * Called by the MessageConsumer when closing, to deregister the consumer from the
-     * map from consumerTag to consumer instance.
+     * Called by the MessageConsumer when closing, to deregister the consumer from the map from consumerTag to consumer
+     * instance.
      *
      * @param consumer the consum
      */
@@ -1883,7 +1887,6 @@
     }
 
 
-
     public int getTicket()
     {
         return _ticket;
@@ -1898,30 +1901,30 @@
     public void requestAccess(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write, boolean read) throws AMQException
     {
         getProtocolHandler().writeCommandFrameAndWaitForReply(AccessRequestBody.createAMQFrame(getChannelId(),
-                                                                         getProtocolMajorVersion(),
-                                                                         getProtocolMinorVersion(),
-                                                                         active,
-                                                                         exclusive,
-                                                                         passive,
-                                                                         read,
-                                                                         realm,
-                                                                         write),
-                                       new BlockingMethodFrameListener(_channelId)
-                                       {
-
-                                           public boolean processMethod(int channelId, AMQMethodBody frame) throws AMQException
-                                           {
-                                               if(frame instanceof AccessRequestOkBody)
-                                               {
-                                                    setTicket(((AccessRequestOkBody)frame).getTicket());
-                                                    return true;
-                                               }
-                                               else
-                                               {
-                                                    return false;
-                                               }
-                                           }
-                                       });
+                                                                                               getProtocolMajorVersion(),
+                                                                                               getProtocolMinorVersion(),
+                                                                                               active,
+                                                                                               exclusive,
+                                                                                               passive,
+                                                                                               read,
+                                                                                               realm,
+                                                                                               write),
+                                                              new BlockingMethodFrameListener(_channelId)
+                                                              {
+
+                                                                  public boolean processMethod(int channelId, AMQMethodBody frame) throws AMQException
+                                                                  {
+                                                                      if (frame instanceof AccessRequestOkBody)
+                                                                      {
+                                                                          setTicket(((AccessRequestOkBody) frame).getTicket());
+                                                                          return true;
+                                                                      }
+                                                                      else
+                                                                      {
+                                                                          return false;
+                                                                      }
+                                                                  }
+                                                              });
 
     }