You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/08/15 05:41:24 UTC

svn commit: r686136 [11/17] - in /incubator/qpid/branches/qpid.0-10/java: ./ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/ broker/bin/ broker/etc/ broker...

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Thu Aug 14 20:40:49 2008
@@ -63,63 +63,54 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQInvalidArgumentException;
 import org.apache.qpid.AMQInvalidRoutingKeyException;
-import org.apache.qpid.AMQUndeliveredException;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.failover.FailoverNoopSupport;
 import org.apache.qpid.client.failover.FailoverProtectedOperation;
 import org.apache.qpid.client.failover.FailoverRetrySupport;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.JMSBytesMessage;
-import org.apache.qpid.client.message.JMSMapMessage;
-import org.apache.qpid.client.message.JMSObjectMessage;
-import org.apache.qpid.client.message.JMSStreamMessage;
-import org.apache.qpid.client.message.JMSTextMessage;
-import org.apache.qpid.client.message.MessageFactoryRegistry;
-import org.apache.qpid.client.message.ReturnMessage;
-import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.message.*;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.util.FlowControllingBlockingQueue;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.AMQState;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.FieldTableFactory;
 import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.jms.Session;
-import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.url.AMQBindingURL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- *
  * <p/><table id="crc"><caption>CRC Card</caption>
  * <tr><th> Responsibilities <th> Collaborations
  * <tr><td>
  * </table>
  *
  * @todo Different FailoverSupport implementation are needed on the same method call, in different situations. For
- *       example, when failing-over and reestablishing the bindings, the bind cannot be interrupted by a second
- *       fail-over, if it fails with an exception, the fail-over process should also fail. When binding outside of
- *       the fail-over process, the retry handler could be used to automatically retry the operation once the connection
- *       has been reestablished. All fail-over protected operations should be placed in private methods, with
- *       FailoverSupport passed in by the caller to provide the correct support for the calling context. Sometimes the
- *       fail-over process sets a nowait flag and uses an async method call instead.
- *
+ * example, when failing-over and reestablishing the bindings, the bind cannot be interrupted by a second
+ * fail-over, if it fails with an exception, the fail-over process should also fail. When binding outside of
+ * the fail-over process, the retry handler could be used to automatically retry the operation once the connection
+ * has been reestablished. All fail-over protected operations should be placed in private methods, with
+ * FailoverSupport passed in by the caller to provide the correct support for the calling context. Sometimes the
+ * fail-over process sets a nowait flag and uses an async method call instead.
  * @todo Two new objects created on every failover supported method call. Consider more efficient ways of doing this,
- *       after looking at worse bottlenecks first.
+ * after looking at worse bottlenecks first.
  */
-public abstract class AMQSession extends Closeable implements Session, QueueSession, TopicSession
+public abstract class AMQSession<C extends BasicMessageConsumer, P extends BasicMessageProducer> extends Closeable implements Session, QueueSession, TopicSession
 {
-    public static final class IdToConsumerMap
+
+
+    public static final class IdToConsumerMap<C extends BasicMessageConsumer>
     {
         private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
-        private final ConcurrentHashMap<Integer, BasicMessageConsumer> _slowAccessConsumers = new ConcurrentHashMap<Integer, BasicMessageConsumer>();
-
+        private final ConcurrentHashMap<Integer, C> _slowAccessConsumers = new ConcurrentHashMap<Integer, C>();
 
-        public BasicMessageConsumer get(int id)
+        public C get(int id)
         {
-            if((id & 0xFFFFFFF0) == 0)
+            if ((id & 0xFFFFFFF0) == 0)
             {
-                return _fastAccessConsumers[id];
+                return (C) _fastAccessConsumers[id];
             }
             else
             {
@@ -127,12 +118,12 @@
             }
         }
 
-        public BasicMessageConsumer put(int id, BasicMessageConsumer consumer)
+        public C put(int id, C consumer)
         {
-            BasicMessageConsumer oldVal;
-            if((id & 0xFFFFFFF0) == 0)
+            C oldVal;
+            if ((id & 0xFFFFFFF0) == 0)
             {
-                oldVal = _fastAccessConsumers[id];
+                oldVal = (C) _fastAccessConsumers[id];
                 _fastAccessConsumers[id] = consumer;
             }
             else
@@ -144,13 +135,12 @@
 
         }
 
-
-        public BasicMessageConsumer remove(int id)
+        public C remove(int id)
         {
-            BasicMessageConsumer consumer;
-            if((id & 0xFFFFFFF0) == 0)
+            C consumer;
+            if ((id & 0xFFFFFFF0) == 0)
             {
-                 consumer = _fastAccessConsumers[id];
+                consumer = (C) _fastAccessConsumers[id];
                 _fastAccessConsumers[id] = null;
             }
             else
@@ -162,15 +152,15 @@
 
         }
 
-        public Collection<BasicMessageConsumer> values()
+        public Collection<C> values()
         {
-            ArrayList<BasicMessageConsumer> values = new ArrayList<BasicMessageConsumer>();
+            ArrayList<C> values = new ArrayList<C>();
 
-            for(int i = 0; i < 16; i++)
+            for (int i = 0; i < 16; i++)
             {
-                if(_fastAccessConsumers[i] != null)
+                if (_fastAccessConsumers[i] != null)
                 {
-                    values.add(_fastAccessConsumers[i]);
+                    values.add((C) _fastAccessConsumers[i]);
                 }
             }
             values.addAll(_slowAccessConsumers.values());
@@ -178,11 +168,10 @@
             return values;
         }
 
-
         public void clear()
         {
             _slowAccessConsumers.clear();
-            for(int i = 0; i<16; i++)
+            for (int i = 0; i < 16; i++)
             {
                 _fastAccessConsumers[i] = null;
             }
@@ -192,8 +181,6 @@
     /** Used for debugging. */
     private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
 
-    /** Used for debugging in the dispatcher. */
-    private static final Logger _dispatcherLogger = LoggerFactory.getLogger(Dispatcher.class);
 
     /** The default maximum number of prefetched message at which to suspend the channel. */
     public static final int DEFAULT_PREFETCH_HIGH_MARK = 5000;
@@ -243,7 +230,6 @@
     /** Holds this session unique identifier, used to distinguish it from other sessions. */
     protected int _channelId;
 
-    /** @todo This does not appear to be set? */
     private int _ticket;
 
     /** Holds the high mark for prefetched message, at which the session is suspended. */
@@ -270,8 +256,8 @@
      * Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked
      * up in the {@link #_subscriptions} map.
      */
-    protected final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap =
-            new ConcurrentHashMap<BasicMessageConsumer, String>();
+    protected final ConcurrentHashMap<C, String> _reverseSubscriptionMap =
+            new ConcurrentHashMap<C, String>();
 
     /**
      * Used to hold incoming messages.
@@ -280,19 +266,13 @@
      */
     protected final FlowControllingBlockingQueue _queue;
 
-    /**
-     * Holds the highest received delivery tag.
-     */
+    /** Holds the highest received delivery tag. */
     private final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
 
-    /**
-     * All the not yet acknowledged message tags
-     */
+    /** All the not yet acknowledged message tags */
     protected ConcurrentLinkedQueue<Long> _unacknowledgedMessageTags = new ConcurrentLinkedQueue<Long>();
 
-    /**
-     * All the delivered message tags
-     */
+    /** All the delivered message tags */
     protected ConcurrentLinkedQueue<Long> _deliveredMessageTags = new ConcurrentLinkedQueue<Long>();
 
     /** Holds the dispatcher thread for this session. */
@@ -314,16 +294,16 @@
      * Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right
      * consumer.
      */
-    protected final IdToConsumerMap _consumers = new IdToConsumerMap();
-    
-            //Map<AMQShortString, BasicMessageConsumer> _consumers =
-            //new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
+    protected final IdToConsumerMap<C> _consumers = new IdToConsumerMap<C>();
+
+    //Map<AMQShortString, BasicMessageConsumer> _consumers =
+    //new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
 
     /**
      * Contains a list of consumers which have been removed but which might still have
      * messages to acknowledge, eg in client ack or transacted modes
      */
-    private CopyOnWriteArrayList<BasicMessageConsumer> _removedConsumers = new CopyOnWriteArrayList<BasicMessageConsumer>();
+    private CopyOnWriteArrayList<C> _removedConsumers = new CopyOnWriteArrayList<C>();
 
     /** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */
     private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount =
@@ -377,10 +357,8 @@
 
     /** Session state : used to detect if commit is a) required b) allowed , i.e. does the tx span failover. */
     private boolean _dirty;
-    /** Has failover occured on this session */
-    private boolean _failedOver;
-
-
+    /** Has failover occured on this session with outstanding actions to commit? */
+    private boolean _failedOverDirty;
 
     private static final class FlowControlIndicator
     {
@@ -388,7 +366,7 @@
 
         public synchronized void setFlowControl(boolean flowControl)
         {
-            _flowControl= flowControl;
+            _flowControl = flowControl;
             notify();
         }
 
@@ -412,7 +390,7 @@
      * @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session.
      * @param defaultPrefetchLowMark  The number of prefetched messages at which to resume the session.
      */
-    AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
+    protected AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
                MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
     {
 
@@ -445,21 +423,25 @@
                     new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
                                                      new FlowControllingBlockingQueue.ThresholdListener()
                                                      {
+                                                         private final AtomicBoolean _suspendState = new AtomicBoolean();
+
                                                          public void aboveThreshold(int currentValue)
                                                          {
-                                                                 _logger.debug(
-                                                                         "Above threshold(" + _defaultPrefetchHighMark
-                                                                         + ") so suspending channel. Current value is " + currentValue);
-                                                                 new Thread(new SuspenderRunner(true)).start();
+                                                             _logger.debug(
+                                                                     "Above threshold(" + _defaultPrefetchHighMark
+                                                                     + ") so suspending channel. Current value is " + currentValue);
+                                                             _suspendState.set(true);
+                                                             new Thread(new SuspenderRunner(_suspendState)).start();
 
                                                          }
 
                                                          public void underThreshold(int currentValue)
                                                          {
-                                                                 _logger.debug(
-                                                                         "Below threshold(" + _defaultPrefetchLowMark
-                                                                         + ") so unsuspending channel. Current value is " + currentValue);
-                                                                 new Thread(new SuspenderRunner(false)).start();
+                                                             _logger.debug(
+                                                                     "Below threshold(" + _defaultPrefetchLowMark
+                                                                     + ") so unsuspending channel. Current value is " + currentValue);
+                                                             _suspendState.set(false);
+                                                             new Thread(new SuspenderRunner(_suspendState)).start();
 
                                                          }
                                                      });
@@ -499,10 +481,30 @@
         close(-1);
     }
 
+    public void checkNotClosed() throws JMSException
+    {
+        try
+        {
+            super.checkNotClosed();
+        }
+        catch (IllegalStateException ise)
+        {
+            // if the Connection has closed then we should throw any exception that has occured that we were not waiting for
+            AMQStateManager manager = _connection.getProtocolHandler().getStateManager();
+
+            if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED) && manager.getLastException() != null)
+            {
+                ise.setLinkedException(manager.getLastException());
+            }
+
+            throw ise;
+        }
+    }
+
     public BytesMessage createBytesMessage() throws JMSException
     {
         checkNotClosed();
-        return new JMSBytesMessage();
+        return new JMSBytesMessage(getMessageDelegateFactory());
     }
 
     /**
@@ -515,7 +517,7 @@
         if (isClosed())
         {
             throw new IllegalStateException("Session is already closed");
-        } 
+        }
         else if (hasFailedOver())
         {
             throw new IllegalStateException("has failed over");
@@ -560,39 +562,35 @@
      * @param exchangeName The exchange to bind the queue on.
      *
      * @throws AMQException If the queue cannot be bound for any reason.
-     *
      * @todo Be aware of possible changes to parameter order as versions change.
-     *
      * @todo Document the additional arguments that may be passed in the field table. Are these for headers exchanges?
      */
     public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
-                          final AMQShortString exchangeName,final AMQDestination destination) throws AMQException
+                          final AMQShortString exchangeName, final AMQDestination destination) throws AMQException
     {
         /*new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()*/
         new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
         {
             public Object execute() throws AMQException, FailoverException
             {
-                sendQueueBind(queueName,routingKey,arguments,exchangeName,destination);
+                sendQueueBind(queueName, routingKey, arguments, exchangeName, destination);
                 return null;
             }
         }, _connection).execute();
     }
 
-
-    public void addBindingKey(BasicMessageConsumer consumer, AMQDestination amqd, String routingKey) throws AMQException
+    public void addBindingKey(C consumer, AMQDestination amqd, String routingKey) throws AMQException
     {
-        if( consumer.getQueuename() != null)
+        if (consumer.getQueuename() != null)
         {
-            bindQueue(consumer.getQueuename(), new AMQShortString(routingKey), new FieldTable(), amqd.getExchangeName(),amqd);
+            bindQueue(consumer.getQueuename(), new AMQShortString(routingKey), new FieldTable(), amqd.getExchangeName(), amqd);
         }
     }
 
     public abstract void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
-            final AMQShortString exchangeName,AMQDestination destination) throws AMQException, FailoverException;
+                                       final AMQShortString exchangeName, AMQDestination destination) throws AMQException, FailoverException;
 
     /**
-
      * Closes the session.
      *
      * <p/>Note that this operation succeeds automatically if a fail-over interupts the sycnronous request to close
@@ -602,14 +600,11 @@
      * @param timeout The timeout in milliseconds to wait for the session close acknoledgement from the broker.
      *
      * @throws JMSException If the JMS provider fails to close the session due to some internal error.
-     *
      * @todo Be aware of possible changes to parameter order as versions change.
-     *
      * @todo Not certain about the logic of ignoring the failover exception, because the channel won't be
-     *       re-opened. May need to examine this more carefully.
-     *
+     * re-opened. May need to examine this more carefully.
      * @todo Note that taking the failover mutex doesn't prevent this operation being interrupted by a failover,
-     *       because the failover process sends the failover event before acquiring the mutex itself.
+     * because the failover process sends the failover event before acquiring the mutex itself.
      */
     public void close(long timeout) throws JMSException
     {
@@ -617,13 +612,13 @@
         {
             StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
             _logger.info("Closing session: " + this); // + ":"
-                         // + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
+            // + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
         }
 
         // Ensure we only try and close an open session.
         if (!_closed.getAndSet(true))
         {
-            synchronized (_connection.getFailoverMutex())
+            synchronized (getFailoverMutex())
             {
                 // We must close down all producers and consumers in an orderly fashion. This is the only method
                 // that can be called from a different thread of control from the one controlling the session.
@@ -634,7 +629,7 @@
 
                     try
                     {
-                       sendClose(timeout);
+                        sendClose(timeout);
                     }
                     catch (AMQException e)
                     {
@@ -685,7 +680,7 @@
 
         if (!_closed.getAndSet(true))
         {
-            synchronized (_connection.getFailoverMutex())
+            synchronized (getFailoverMutex())
             {
                 synchronized (_messageDeliveryLock)
                 {
@@ -701,7 +696,6 @@
                         amqe = new AMQException("Closing session forcibly", e);
                     }
 
-
                     _connection.deregisterSession(_channelId);
                     closeProducersAndConsumers(amqe);
                 }
@@ -719,12 +713,11 @@
      * @throws JMSException If the JMS provider fails to commit the transaction due to some internal error. This does
      *                      not mean that the commit is known to have failed, merely that it is not known whether it
      *                      failed or not.
-     *
      * @todo Be aware of possible changes to parameter order as versions change.
      */
     public void commit() throws JMSException
     {
-    	checkTransacted();
+        checkTransacted();
 
         try
         {
@@ -743,6 +736,7 @@
             }
             // Commits outstanding messages and acknowledgments
             sendCommit();
+            markClean();
         }
         catch (AMQException e)
         {
@@ -756,11 +750,12 @@
 
     public abstract void sendCommit() throws AMQException, FailoverException;
 
-    public void confirmConsumerCancelled(AMQShortString consumerTag)
+
+    public void confirmConsumerCancelled(int consumerTag)
     {
 
         // Remove the consumer from the map
-        BasicMessageConsumer consumer = _consumers.get(consumerTag.toIntValue());
+        C consumer = _consumers.get(consumerTag);
         if (consumer != null)
         {
             if (!consumer.isNoConsume())  // Normal Consumer
@@ -788,10 +783,8 @@
 
                 // consumer.markClosed();
 
-
-
                 if (consumer.isAutoClose())
-                {     
+                {
                     // There is a small window where the message is between the two queues in the dispatcher.
                     if (consumer.isClosed())
                     {
@@ -802,6 +795,10 @@
 
                         deregisterConsumer(consumer);
                     }
+                    else
+                    {
+                        _queue.add(new CloseConsumerMessage(consumer));
+                    }
                 }
             }
         }
@@ -847,7 +844,7 @@
                                   false, false);
     }
 
-    public MessageConsumer createExclusiveConsumer(Destination destination) throws JMSException
+    public C createExclusiveConsumer(Destination destination) throws JMSException
     {
         checkValidDestination(destination);
 
@@ -855,7 +852,6 @@
                                   false, false);
     }
 
-
     public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
     {
         checkValidDestination(destination);
@@ -873,7 +869,6 @@
                                   messageSelector, null, false, false);
     }
 
-
     public MessageConsumer createExclusiveConsumer(Destination destination, String messageSelector, boolean noLocal)
             throws JMSException
     {
@@ -883,7 +878,6 @@
                                   messageSelector, null, false, false);
     }
 
-
     public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
                                           String selector) throws JMSException
     {
@@ -920,7 +914,7 @@
     public abstract TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException;
 
     public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
-        throws JMSException
+            throws JMSException
     {
         checkNotClosed();
         checkValidTopic(topic);
@@ -929,8 +923,8 @@
             _subscriptions.get(name).close();
         }
         AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
-        BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
-        TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer);
+        C consumer = (C) createConsumer(dest, messageSelector, noLocal);
+        TopicSubscriberAdaptor<C> subscriber = new TopicSubscriberAdaptor(dest, consumer);
         _subscriptions.put(name, subscriber);
         _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
 
@@ -940,7 +934,7 @@
     public MapMessage createMapMessage() throws JMSException
     {
         checkNotClosed();
-        return new JMSMapMessage();
+        return new JMSMapMessage(getMessageDelegateFactory());
     }
 
     public javax.jms.Message createMessage() throws JMSException
@@ -951,7 +945,7 @@
     public ObjectMessage createObjectMessage() throws JMSException
     {
         checkNotClosed();
-        return (ObjectMessage) new JMSObjectMessage();
+        return (ObjectMessage) new JMSObjectMessage(getMessageDelegateFactory());
     }
 
     public ObjectMessage createObjectMessage(Serializable object) throws JMSException
@@ -962,23 +956,23 @@
         return msg;
     }
 
-    public BasicMessageProducer createProducer(Destination destination) throws JMSException
+    public P createProducer(Destination destination) throws JMSException
     {
         return createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE);
     }
 
-    public BasicMessageProducer createProducer(Destination destination, boolean immediate) throws JMSException
+    public P createProducer(Destination destination, boolean immediate) throws JMSException
     {
         return createProducerImpl(destination, DEFAULT_MANDATORY, immediate);
     }
 
-    public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate)
+    public P createProducer(Destination destination, boolean mandatory, boolean immediate)
             throws JMSException
     {
         return createProducerImpl(destination, mandatory, immediate);
     }
 
-    public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate,
+    public P createProducer(Destination destination, boolean mandatory, boolean immediate,
                                                boolean waitUntilSent) throws JMSException
     {
         return createProducerImpl(destination, mandatory, immediate, waitUntilSent);
@@ -988,7 +982,7 @@
     {
         checkNotClosed();
 
-        return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic,false,false), topic);
+        return new TopicPublisherAdapter((P) createProducer(topic, false, false), topic);
     }
 
     public Queue createQueue(String queueName) throws JMSException
@@ -1025,24 +1019,44 @@
      * @param exclusive  Flag to indicate that the queue is exclusive to this client.
      *
      * @throws AMQException If the queue cannot be declared for any reason.
-     *
      * @todo Be aware of possible changes to parameter order as versions change.
      */
     public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable,
                             final boolean exclusive) throws AMQException
     {
+        createQueue(name, autoDelete, durable, exclusive, null);
+    }
+
+    /**
+     * Declares the named queue.
+     *
+     * <p/>Note that this operation automatically retries in the event of fail-over.
+     *
+     * @param name       The name of the queue to declare.
+     * @param autoDelete
+     * @param durable    Flag to indicate that the queue is durable.
+     * @param exclusive  Flag to indicate that the queue is exclusive to this client.
+     * @param arguments  Arguments used to set special properties of the queue
+     *
+     * @throws AMQException If the queue cannot be declared for any reason.
+     * @todo Be aware of possible changes to parameter order as versions change.
+     */
+    public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable,
+                            final boolean exclusive, final Map<String, Object> arguments) throws AMQException
+    {
         new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
         {
             public Object execute() throws AMQException, FailoverException
             {
-                sendCreateQueue(name, autoDelete, durable, exclusive);
+                sendCreateQueue(name, autoDelete, durable, exclusive, arguments);
                 return null;
             }
         }, _connection).execute();
     }
 
     public abstract void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable,
-            final boolean exclusive)throws AMQException, FailoverException;
+                                         final boolean exclusive, final Map<String, Object> arguments) throws AMQException, FailoverException;
+
     /**
      * Creates a QueueReceiver
      *
@@ -1056,7 +1070,7 @@
     {
         checkValidDestination(destination);
         AMQQueue dest = (AMQQueue) destination;
-        BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination);
+        C consumer = (C) createConsumer(destination);
 
         return new QueueReceiverAdaptor(dest, consumer);
     }
@@ -1075,7 +1089,7 @@
     {
         checkValidDestination(destination);
         AMQQueue dest = (AMQQueue) destination;
-        BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination, messageSelector);
+        C consumer = (C) createConsumer(destination, messageSelector);
 
         return new QueueReceiverAdaptor(dest, consumer);
     }
@@ -1093,7 +1107,7 @@
     {
         checkNotClosed();
         AMQQueue dest = (AMQQueue) queue;
-        BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest);
+        C consumer = (C) createConsumer(dest);
 
         return new QueueReceiverAdaptor(dest, consumer);
     }
@@ -1112,7 +1126,7 @@
     {
         checkNotClosed();
         AMQQueue dest = (AMQQueue) queue;
-        BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector);
+        C consumer = (C) createConsumer(dest, messageSelector);
 
         return new QueueReceiverAdaptor(dest, consumer);
     }
@@ -1127,11 +1141,18 @@
 
     public StreamMessage createStreamMessage() throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
+        // This method needs to be improved. Throwables only arrive here from the mina : exceptionRecived
+        // calls through connection.closeAllSessions which is also called by the public connection.close()
+        // with a null cause
+        // When we are closing the Session due to a protocol session error we simply create a new AMQException
+        // with the correct error code and text this is cleary WRONG as the instanceof check below will fail.
+        // We need to determin here if the connection should be
+
+        synchronized (getFailoverMutex())
         {
             checkNotClosed();
 
-            return new JMSStreamMessage();
+            return new JMSStreamMessage(getMessageDelegateFactory());
         }
     }
 
@@ -1150,10 +1171,9 @@
         AMQTopic dest = checkValidTopic(topic);
 
         // AMQTopic dest = new AMQTopic(topic.getTopicName());
-        return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest));
+        return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest));
     }
 
-
     /**
      * Creates a non-durable subscriber with a message selector
      *
@@ -1171,7 +1191,7 @@
         AMQTopic dest = checkValidTopic(topic);
 
         // AMQTopic dest = new AMQTopic(topic.getTopicName());
-        return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest, messageSelector, noLocal));
+        return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest, messageSelector, noLocal));
     }
 
     public abstract TemporaryQueue createTemporaryQueue() throws JMSException;
@@ -1185,14 +1205,19 @@
 
     public TextMessage createTextMessage() throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
+        synchronized (getFailoverMutex())
         {
             checkNotClosed();
 
-            return new JMSTextMessage();
+            return new JMSTextMessage(getMessageDelegateFactory());
         }
     }
 
+    protected Object getFailoverMutex()
+    {
+        return _connection.getFailoverMutex();
+    }
+
     public TextMessage createTextMessage(String text) throws JMSException
     {
 
@@ -1340,17 +1365,8 @@
         {
             _logger.debug("Message[" + message.toString() + "] received in session");
         }
-
-        if (message instanceof ReturnMessage)
-        {
-            // Return of the bounced message.
-            returnBouncedMessage((ReturnMessage)message);
-        }
-        else
-        {
-            _highestDeliveryTag.set(message.getDeliveryTag());
-            _queue.add(message);
-        }
+        _highestDeliveryTag.set(message.getDeliveryTag());
+        _queue.add(message);
     }
 
     public void declareAndBind(AMQDestination amqd)
@@ -1360,7 +1376,7 @@
         AMQProtocolHandler protocolHandler = getProtocolHandler();
         declareExchange(amqd, protocolHandler, false);
         AMQShortString queueName = declareQueue(amqd, protocolHandler, false);
-        bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(),amqd);
+        bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(), amqd);
     }
 
     /**
@@ -1375,7 +1391,7 @@
      * <li>Stop message delivery.</li>
      * <li>Mark all messages that might have been delivered but not acknowledged as "redelivered".
      * <li>Restart the delivery sequence including all unacknowledged messages that had been previously delivered.
-     *     Redelivered messages do not have to be delivered in exactly their original delivery order.</li>
+     * Redelivered messages do not have to be delivered in exactly their original delivery order.</li>
      * </ul>
      *
      * <p/>If the recover operation is interrupted by a fail-over, between asking that the broker begin recovery and
@@ -1429,7 +1445,7 @@
         }
     }
 
-    abstract void sendRecover() throws AMQException, FailoverException;
+    protected abstract void sendRecover() throws AMQException, FailoverException;
 
     public void rejectMessage(UnprocessedMessage message, boolean requeue)
     {
@@ -1465,7 +1481,6 @@
      * @throws JMSException If the JMS provider fails to rollback the transaction due to some internal error. This does
      *                      not mean that the rollback is known to have failed, merely that it is not known whether it
      *                      failed or not.
-     *
      * @todo Be aware of possible changes to parameter order as versions change.
      */
     public void rollback() throws JMSException
@@ -1507,8 +1522,7 @@
 
     public abstract void releaseForRollback();
 
-    public abstract void  sendRollback() throws AMQException, FailoverException ;
-
+    public abstract void sendRollback() throws AMQException, FailoverException;
 
     public void run()
     {
@@ -1576,7 +1590,7 @@
 
                 deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
             }
-            else
+            else // Queue Browser
             {
 
                 if (isQueueBound(getDefaultTopicExchangeName(), AMQTopic.getDurableTopicQueueName(name, _connection)))
@@ -1591,7 +1605,7 @@
         }
     }
 
-    protected MessageConsumer createConsumerImpl(final Destination destination, final int prefetchHigh,
+    protected C createConsumerImpl(final Destination destination, final int prefetchHigh,
                                                  final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector,
                                                  final boolean noConsume, final boolean autoClose) throws JMSException
     {
@@ -1615,10 +1629,10 @@
             messageSelector = selector;
         }
 
-        return new FailoverRetrySupport<MessageConsumer, JMSException>(
-                new FailoverProtectedOperation<MessageConsumer, JMSException>()
+        return new FailoverRetrySupport<C, JMSException>(
+                new FailoverProtectedOperation<C, JMSException>()
                 {
-                    public MessageConsumer execute() throws JMSException, FailoverException
+                    public C execute() throws JMSException, FailoverException
                     {
                         checkNotClosed();
 
@@ -1630,13 +1644,19 @@
                         final FieldTable ft = FieldTableFactory.newFieldTable();
                         // if (rawSelector != null)
                         // ft.put("headers", rawSelector.getDataAsBytes());
-                        if (rawSelector != null)
+                        // rawSelector is used by HeadersExchange and is not a JMS Selector
+                        if (rawSelector != null) 
                         {
                             ft.addAll(rawSelector);
                         }
+                        
+                        if (messageSelector != null)
+                        {
+                            ft.put(new AMQShortString("x-filter-jms-selector"), messageSelector);
+                        }
 
-                        BasicMessageConsumer consumer = createMessageConsumer(amqd, prefetchHigh,prefetchLow,
-                                noLocal,exclusive, messageSelector, ft, noConsume, autoClose);
+                        C consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow,
+                                                                              noLocal, exclusive, messageSelector, ft, noConsume, autoClose);
 
                         if (_messageListener != null)
                         {
@@ -1679,9 +1699,9 @@
                 }, _connection).execute();
     }
 
-    public abstract BasicMessageConsumer createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
-            final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector,
-            final boolean noConsume, final boolean autoClose) throws JMSException;
+    public abstract C createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
+                                                               final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable arguments,
+                                                               final boolean noConsume, final boolean autoClose) throws JMSException;
 
     /**
      * Called by the MessageConsumer when closing, to deregister the consumer from the map from consumerTag to consumer
@@ -1689,9 +1709,9 @@
      *
      * @param consumer the consum
      */
-    void deregisterConsumer(BasicMessageConsumer consumer)
+    void deregisterConsumer(C consumer)
     {
-        if (_consumers.remove(consumer.getConsumerTag().toIntValue()) != null)
+        if (_consumers.remove(consumer.getConsumerTag()) != null)
         {
             String subscriptionName = _reverseSubscriptionMap.remove(consumer);
             if (subscriptionName != null)
@@ -1744,12 +1764,11 @@
      * @return <tt>true</tt> if the queue is bound to the exchange and routing key, <tt>false</tt> if not.
      *
      * @throws JMSException If the query fails for any reason.
-     *
      * @todo Be aware of possible changes to parameter order as versions change.
      */
     public abstract boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
             throws JMSException;
-            
+
     public abstract boolean isQueueBound(final AMQDestination destination) throws JMSException;
 
     /**
@@ -1771,7 +1790,10 @@
      */
     void resubscribe() throws AMQException
     {
-        _failedOver = true;
+        if (_dirty)
+        {
+            _failedOverDirty = true;
+        }
         resubscribeProducers();
         resubscribeConsumers();
     }
@@ -1790,10 +1812,9 @@
      * Starts the session, which ensures that it is not suspended and that its event dispatcher is running.
      *
      * @throws AMQException If the session cannot be started for any reason.
-     *
      * @todo This should be controlled by _stopped as it pairs with the stop method fixme or check the
-     *       FlowControlledBlockingQueue _queue to see if we have flow controlled. will result in sending Flow messages
-     *       for each subsequent call to flow.. only need to do this if we have called stop.
+     * FlowControlledBlockingQueue _queue to see if we have flow controlled. will result in sending Flow messages
+     * for each subsequent call to flow.. only need to do this if we have called stop.
      */
     void start() throws AMQException
     {
@@ -1978,12 +1999,12 @@
     {
         // we need to clone the list of consumers since the close() method updates the _consumers collection
         // which would result in a concurrent modification exception
-        final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList<BasicMessageConsumer>(_consumers.values());
+        final ArrayList<C> clonedConsumers = new ArrayList<C>(_consumers.values());
 
-        final Iterator<BasicMessageConsumer> it = clonedConsumers.iterator();
+        final Iterator<C> it = clonedConsumers.iterator();
         while (it.hasNext())
         {
-            final BasicMessageConsumer con = it.next();
+            final C con = it.next();
             if (error != null)
             {
                 con.notifyError(error);
@@ -1994,7 +2015,7 @@
             }
         }
         // at this point the _consumers map will be empty
-         if (_dispatcher != null)
+        if (_dispatcher != null)
         {
             _dispatcher.close();
             _dispatcher = null;
@@ -2014,7 +2035,7 @@
         final Iterator it = clonedProducers.iterator();
         while (it.hasNext())
         {
-            final BasicMessageProducer prod = (BasicMessageProducer) it.next();
+            final P prod = (P) it.next();
             prod.close();
         }
         // at this point the _producers map is empty
@@ -2062,20 +2083,18 @@
      *
      * @param queueName
      */
-    private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName,
+    private void consumeFromQueue(C consumer, AMQShortString queueName,
                                   AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException
     {
         int tagId = _nextTag++;
-        // need to generate a consumer tag on the client so we can exploit the nowait flag
-        AMQShortString tag = new AMQShortString(Integer.toString(tagId));
 
-        consumer.setConsumerTag(tag);
+        consumer.setConsumerTag(tagId);
         // we must register the consumer in the map before we actually start listening
         _consumers.put(tagId, consumer);
 
         try
         {
-            sendConsume(consumer, queueName, protocolHandler, nowait, messageSelector, tag);
+            sendConsume(consumer, queueName, protocolHandler, nowait, messageSelector, tagId);
         }
         catch (AMQException e)
         {
@@ -2085,27 +2104,27 @@
         }
     }
 
-    public abstract void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName,
-            AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector,AMQShortString tag) throws AMQException, FailoverException;
+    public abstract void sendConsume(C consumer, AMQShortString queueName,
+                                     AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, int tag) throws AMQException, FailoverException;
 
-    private BasicMessageProducer createProducerImpl(Destination destination, boolean mandatory, boolean immediate)
+    private P createProducerImpl(Destination destination, boolean mandatory, boolean immediate)
             throws JMSException
     {
         return createProducerImpl(destination, mandatory, immediate, false);
     }
 
-    private BasicMessageProducer createProducerImpl(final Destination destination, final boolean mandatory,
+    private P createProducerImpl(final Destination destination, final boolean mandatory,
                                                     final boolean immediate, final boolean waitUntilSent) throws JMSException
     {
-        return new FailoverRetrySupport<BasicMessageProducer, JMSException>(
-                new FailoverProtectedOperation<BasicMessageProducer, JMSException>()
+        return new FailoverRetrySupport<P, JMSException>(
+                new FailoverProtectedOperation<P, JMSException>()
                 {
-                    public BasicMessageProducer execute() throws JMSException, FailoverException
+                    public P execute() throws JMSException, FailoverException
                     {
                         checkNotClosed();
                         long producerId = getNextProducerId();
-                        BasicMessageProducer producer = createMessageProducer(destination, mandatory,
-                                immediate, waitUntilSent, producerId);
+                        P producer = createMessageProducer(destination, mandatory,
+                                                                              immediate, waitUntilSent, producerId);
                         registerProducer(producerId, producer);
 
                         return producer;
@@ -2113,21 +2132,20 @@
                 }, _connection).execute();
     }
 
-    public abstract BasicMessageProducer createMessageProducer(final Destination destination, final boolean mandatory,
-            final boolean immediate, final boolean waitUntilSent, long producerId);
+    public abstract P createMessageProducer(final Destination destination, final boolean mandatory,
+                                                               final boolean immediate, final boolean waitUntilSent, long producerId);
 
     private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException
     {
         declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait);
     }
 
-
     /**
      * Returns the number of messages currently queued for the given destination.
      *
      * <p/>Note that this operation automatically retries in the event of fail-over.
      *
-     * @param amqd            The destination to be checked
+     * @param amqd The destination to be checked
      *
      * @return the number of queued messages.
      *
@@ -2147,7 +2165,7 @@
 
     }
 
-    abstract Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException;
+    protected abstract Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException;
 
     /**
      * Declares the named exchange and type of exchange.
@@ -2160,7 +2178,6 @@
      * @param nowait
      *
      * @throws AMQException If the exchange cannot be declared for any reason.
-     *
      * @todo Be aware of possible changes to parameter order as versions change.
      */
     private void declareExchange(final AMQShortString name, final AMQShortString type,
@@ -2177,8 +2194,7 @@
     }
 
     public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler,
-            final boolean nowait) throws AMQException, FailoverException;
-
+                                             final boolean nowait) throws AMQException, FailoverException;
 
     /**
      * Declares a queue for a JMS destination.
@@ -2196,9 +2212,7 @@
      *         the client.
      *
      * @throws AMQException If the queue cannot be declared for any reason.
-     *
      * @todo Verify the destiation is valid or throw an exception.
-     *
      * @todo Be aware of possible changes to parameter order as versions change.
      */
     protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
@@ -2224,7 +2238,7 @@
                 }, _connection).execute();
     }
 
-    public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler)throws AMQException, FailoverException;
+    public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) throws AMQException, FailoverException;
 
     /**
      * Undeclares the specified queue.
@@ -2234,7 +2248,6 @@
      * @param queueName The name of the queue to delete.
      *
      * @throws JMSException If the queue could not be deleted for any reason.
-     *
      * @todo Be aware of possible changes to parameter order as versions change.
      */
     protected void deleteQueue(final AMQShortString queueName) throws JMSException
@@ -2256,7 +2269,7 @@
         }
     }
 
-    public abstract void sendQueueDelete(final AMQShortString queueName)  throws AMQException, FailoverException;
+    public abstract void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException;
 
     private long getNextProducerId()
     {
@@ -2292,12 +2305,12 @@
         }
         // we need to clone the list of consumers since the close() method updates the _consumers collection
         // which would result in a concurrent modification exception
-        final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList<BasicMessageConsumer>(_consumers.values());
+        final ArrayList<C> clonedConsumers = new ArrayList<C>(_consumers.values());
 
-        final Iterator<BasicMessageConsumer> it = clonedConsumers.iterator();
+        final Iterator<C> it = clonedConsumers.iterator();
         while (it.hasNext())
         {
-            final BasicMessageConsumer con = it.next();
+            final C con = it.next();
             con.markClosed();
         }
         // at this point the _consumers map will be empty
@@ -2332,7 +2345,7 @@
      *
      * @throws AMQException
      */
-    private void registerConsumer(BasicMessageConsumer consumer, boolean nowait) throws AMQException // , FailoverException
+    private void registerConsumer(C consumer, boolean nowait) throws AMQException // , FailoverException
     {
         AMQDestination amqd = consumer.getDestination();
 
@@ -2345,8 +2358,7 @@
         // store the consumer queue name
         consumer.setQueuename(queueName);
 
-        // bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
-        bindQueue(queueName, amqd.getRoutingKey(), consumer.getRawSelectorFieldTable(), amqd.getExchangeName(),amqd);
+        bindQueue(queueName, amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd);
 
         // If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch
         if (!_immediatePrefetch)
@@ -2397,15 +2409,16 @@
 
     private void rejectAllMessages(boolean requeue)
     {
-        rejectMessagesForConsumerTag(null, requeue);
+        rejectMessagesForConsumerTag(0, requeue, true);
     }
 
     /**
      * @param consumerTag The consumerTag to prune from queue or all if null
      * @param requeue     Should the removed messages be requeued (or discarded. Possibly to DLQ)
+     * @param rejectAllConsumers
      */
 
-    private void rejectMessagesForConsumerTag(AMQShortString consumerTag, boolean requeue)
+    private void rejectMessagesForConsumerTag(int consumerTag, boolean requeue, boolean rejectAllConsumers)
     {
         Iterator messages = _queue.iterator();
         if (_logger.isInfoEnabled())
@@ -2426,12 +2439,12 @@
         {
             UnprocessedMessage message = (UnprocessedMessage) messages.next();
 
-            if ((consumerTag == null) || message.getConsumerTag().equals(consumerTag.toString()))
+            if (rejectAllConsumers || (message.getConsumerTag() == consumerTag))
             {
                 if (_logger.isDebugEnabled())
                 {
                     _logger.debug("Removing message(" + System.identityHashCode(message) + ") from _queue DT:"
-                        + message.getDeliveryTag());
+                                  + message.getDeliveryTag());
                 }
 
                 messages.remove();
@@ -2448,12 +2461,11 @@
 
     private void resubscribeConsumers() throws AMQException
     {
-        ArrayList consumers = new ArrayList(_consumers.values());
+        ArrayList<C> consumers = new ArrayList<C>(_consumers.values());
         _consumers.clear();
 
-        for (Iterator it = consumers.iterator(); it.hasNext();)
-        {
-            BasicMessageConsumer consumer = (BasicMessageConsumer) it.next();
+        for (C consumer : consumers)
+        {            
             consumer.failedOver();
             registerConsumer(consumer, true);
         }
@@ -2465,53 +2477,11 @@
         _logger.info(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: removeKey
         for (Iterator it = producers.iterator(); it.hasNext();)
         {
-            BasicMessageProducer producer = (BasicMessageProducer) it.next();
+            P producer = (P) it.next();
             producer.resubscribe();
         }
     }
 
-    private void returnBouncedMessage(final ReturnMessage msg)
-    {
-        _connection.performConnectionTask(new Runnable()
-        {
-            public void run()
-            {
-                try
-                {
-                    // Bounced message is processed here, away from the mina thread
-                    AbstractJMSMessage bouncedMessage =
-                            _messageFactoryRegistry.createMessage(0, false, msg.getExchange(),
-                            		msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies());
-                        AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode());
-                        AMQShortString reason = msg.getReplyText();
-                        _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
-
-                    // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
-                    if (errorCode == AMQConstant.NO_CONSUMERS)
-                    {
-                        _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage, null));
-                    }
-                    else if (errorCode == AMQConstant.NO_ROUTE)
-                    {
-                        _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null));
-                    }
-                    else
-                    {
-                        _connection.exceptionReceived(
-                                new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage, null));
-                    }
-
-                }
-                catch (Exception e)
-                {
-                    _logger.error(
-                            "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...",
-                            e);
-                }
-            }
-        });
-    }
-
     /**
      * Suspends or unsuspends this session.
      *
@@ -2519,7 +2489,6 @@
      *                should be unsuspended.
      *
      * @throws AMQException If the session cannot be suspended for any reason.
-     *
      * @todo Be aware of possible changes to parameter order as versions change.
      */
     protected void suspendChannel(boolean suspend) throws AMQException // , FailoverException
@@ -2560,7 +2529,6 @@
         return getAMQConnection().getMaxPrefetch() > 0;
     }
 
-
     /** Signifies that the session has pending sends to commit. */
     public void markDirty()
     {
@@ -2571,7 +2539,7 @@
     public void markClean()
     {
         _dirty = false;
-        _failedOver = false;
+        _failedOverDirty = false;
     }
 
     /**
@@ -2581,7 +2549,7 @@
      */
     public boolean hasFailedOver()
     {
-        return _failedOver;
+        return _failedOverDirty;
     }
 
     /**
@@ -2604,12 +2572,11 @@
         _flowControl.setFlowControl(active);
     }
 
-
     public void checkFlowControl() throws InterruptedException
     {
-        synchronized(_flowControl)
+        synchronized (_flowControl)
         {
-            while(!_flowControl.getFlowControl())
+            while (!_flowControl.getFlowControl())
             {
                 _flowControl.wait();
             }
@@ -2617,6 +2584,9 @@
 
     }
 
+    /** Used for debugging in the dispatcher. */
+    private static final Logger _dispatcherLogger = LoggerFactory.getLogger("org.apache.qpid.client.AMQSession.Dispatcher");
+    
 
     /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
     class Dispatcher extends Thread
@@ -2629,6 +2599,8 @@
         private final AtomicLong _rollbackMark = new AtomicLong(-1);
         private String dispatcherID = "" + System.identityHashCode(this);
 
+
+
         public Dispatcher()
         {
             super("Dispatcher-Channel-" + _channelId);
@@ -2647,7 +2619,7 @@
 
         }
 
-        public void rejectPending(BasicMessageConsumer consumer)
+        public void rejectPending(C consumer)
         {
             synchronized (_lock)
             {
@@ -2662,7 +2634,7 @@
                 consumer.rollbackPendingMessages();
 
                 // Reject messages on pre-dispatch queue
-                rejectMessagesForConsumerTag(consumer.getConsumerTag(), true);
+                rejectMessagesForConsumerTag(consumer.getConsumerTag(), true, false);
                 //Let the dispatcher deal with this when it gets to them.
 
                 // closeConsumer
@@ -2689,7 +2661,7 @@
 
                 _dispatcherLogger.debug("Session Pre Dispatch Queue cleared");
 
-                for (BasicMessageConsumer consumer : _consumers.values())
+                for (C consumer : _consumers.values())
                 {
                     if (!consumer.isNoConsume())
                     {
@@ -2755,7 +2727,8 @@
                             _lock.wait();
                         }
 
-                        if (tagLE(deliveryTag, _rollbackMark.get()))
+                        if (!(message instanceof CloseConsumerMessage)
+                            && tagLE(deliveryTag, _rollbackMark.get()))
                         {
                             rejectMessage(message, true);
                         }
@@ -2816,8 +2789,8 @@
             //This if block is not needed anymore as bounce messages are handled separately
             //if (message.getDeliverBody() != null)
             //{
-            final BasicMessageConsumer consumer =
-                _consumers.get(message.getConsumerTag().toIntValue());
+            final C consumer =
+                    _consumers.get(message.getConsumerTag());
 
             if ((consumer == null) || consumer.isClosed())
             {
@@ -2826,14 +2799,26 @@
                     if (consumer == null)
                     {
                         _dispatcherLogger.info("Dispatcher(" + dispatcherID + ")Received a message(" + System.identityHashCode(message) + ")" + "["
-                                + message.getDeliveryTag() + "] from queue "
-                                + message.getConsumerTag() + " )without a handler - rejecting(requeue)...");
+                                               + message.getDeliveryTag() + "] from queue "
+                                               + message.getConsumerTag() + " )without a handler - rejecting(requeue)...");
                     }
                     else
                     {
-                        _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "["
-                                + message.getDeliveryTag() + "] from queue " + " consumer("
-                                + message.getConsumerTag() + ") is closed rejecting(requeue)...");
+                        if (consumer.isNoConsume())
+                        {
+                            _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "["
+                                                   + message.getDeliveryTag() + "] from queue " + " consumer("
+                                                   + message.getConsumerTag() + ") is closed and a browser so dropping...");
+                            //DROP MESSAGE
+                            return;
+
+                        }
+                        else
+                        {
+                            _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "["
+                                                   + message.getDeliveryTag() + "] from queue " + " consumer("
+                                                   + message.getConsumerTag() + ") is closed rejecting(requeue)...");
+                        }
                     }
                 }
                 // Don't reject if we're already closing
@@ -2850,9 +2835,11 @@
         }
     }
 
-    abstract boolean tagLE(long tag1, long tag2);
+    protected abstract boolean tagLE(long tag1, long tag2);
+
+    protected abstract boolean updateRollbackMark(long current, long deliveryTag);
 
-    abstract boolean updateRollbackMark(long current, long deliveryTag);
+    public abstract AMQMessageDelegateFactory getMessageDelegateFactory();
 
     /*public void requestAccess(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write,
         boolean read) throws AMQException
@@ -2880,9 +2867,9 @@
 
     private class SuspenderRunner implements Runnable
     {
-        private boolean _suspend;
+        private AtomicBoolean _suspend;
 
-        public SuspenderRunner(boolean suspend)
+        public SuspenderRunner(AtomicBoolean suspend)
         {
             _suspend = suspend;
         }
@@ -2891,7 +2878,10 @@
         {
             try
             {
-                suspendChannel(_suspend);
+                synchronized (_suspensionLock)
+                {
+                    suspendChannel(_suspend.get());
+                }
             }
             catch (AMQException e)
             {

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Thu Aug 14 20:40:49 2008
@@ -27,17 +27,18 @@
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.message.MessageFactoryRegistry;
 import org.apache.qpid.client.message.FiledTableSupport;
+import org.apache.qpid.client.message.AMQMessageDelegateFactory;
 import org.apache.qpid.util.Serial;
-import org.apache.qpidity.nclient.Session;
-import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
-import org.apache.qpidity.ErrorCode;
-import org.apache.qpidity.QpidException;
-import org.apache.qpidity.transport.MessageCreditUnit;
-import org.apache.qpidity.transport.MessageFlowMode;
-import org.apache.qpidity.transport.RangeSet;
-import org.apache.qpidity.transport.Option;
-import org.apache.qpidity.transport.ExchangeBoundResult;
-import org.apache.qpidity.transport.Future;
+import org.apache.qpid.nclient.Session;
+import org.apache.qpid.nclient.util.MessagePartListenerAdapter;
+import org.apache.qpid.ErrorCode;
+import org.apache.qpid.QpidException;
+import org.apache.qpid.transport.MessageCreditUnit;
+import org.apache.qpid.transport.MessageFlowMode;
+import org.apache.qpid.transport.RangeSet;
+import org.apache.qpid.transport.Option;
+import org.apache.qpid.transport.ExchangeBoundResult;
+import org.apache.qpid.transport.Future;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,7 +52,7 @@
 /**
  * This is a 0.10 Session
  */
-public class AMQSession_0_10 extends AMQSession
+public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, BasicMessageProducer_0_10>
 {
 
     /**
@@ -71,8 +72,11 @@
     private Object _currentExceptionLock = new Object();
     private QpidException _currentException;
 
-    // a ref on the qpidity connection
-    protected org.apache.qpidity.nclient.Connection _qpidConnection;
+    // a ref on the qpid connection
+    protected org.apache.qpid.nclient.Connection _qpidConnection;
+
+    private RangeSet unacked = new RangeSet();
+    private int unackedCount = 0;
 
     /**
      * USed to store the range of in tx messages
@@ -93,7 +97,7 @@
      * @param defaultPrefetchLowMark  The number of prefetched messages at which to resume the session.
      * @param qpidConnection          The qpid connection
      */
-    AMQSession_0_10(org.apache.qpidity.nclient.Connection qpidConnection, AMQConnection con, int channelId,
+    AMQSession_0_10(org.apache.qpid.nclient.Connection qpidConnection, AMQConnection con, int channelId,
                     boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry,
                     int defaultPrefetchHighMark, int defaultPrefetchLowMark)
     {
@@ -123,7 +127,7 @@
      * @param defaultPrefetchLow  The number of prefetched messages at which to resume the session.
      * @param qpidConnection      The connection
      */
-    AMQSession_0_10(org.apache.qpidity.nclient.Connection qpidConnection, AMQConnection con, int channelId,
+    AMQSession_0_10(org.apache.qpid.nclient.Connection qpidConnection, AMQConnection con, int channelId,
                     boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow)
     {
 
@@ -131,6 +135,18 @@
              defaultPrefetchHigh, defaultPrefetchLow);
     }
 
+    private void addUnacked(int id)
+    {
+        unacked.add(id);
+        unackedCount++;
+    }
+
+    private void clearUnacked()
+    {
+        unacked.clear();
+        unackedCount = 0;
+    }
+
     //------- overwritten methods of class AMQSession
 
     /**
@@ -140,6 +156,7 @@
      * @param multiple    <tt>true</tt> to acknowledge all messages up to and including the one specified by the
      *                    delivery tag, <tt>false</tt> to just acknowledge that message.
      */
+
     public void acknowledgeMessage(long deliveryTag, boolean multiple)
     {
         if (_logger.isDebugEnabled())
@@ -147,14 +164,13 @@
             _logger.debug("Sending ack for delivery tag " + deliveryTag + " on session " + _channelId);
         }
         // acknowledge this message
-        RangeSet ranges = new RangeSet();
         if (multiple)
         {
             for (Long messageTag : _unacknowledgedMessageTags)
             {
                 if( messageTag <= deliveryTag )
                 {
-                    ranges.add((int) (long) messageTag);
+                    addUnacked(messageTag.intValue());
                     _unacknowledgedMessageTags.remove(messageTag);
                 }
             }
@@ -163,10 +179,26 @@
         }
         else
         {
-            ranges.add((int) deliveryTag);
+            addUnacked((int) deliveryTag);
             _unacknowledgedMessageTags.remove(deliveryTag);
         }
-        getQpidSession().messageAcknowledge(ranges, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
+
+        long prefetch = getAMQConnection().getMaxPrefetch();
+
+        if (unackedCount >= prefetch/2)
+        {
+            flushAcknowledgments();
+        }
+    }
+
+    void flushAcknowledgments()
+    {
+        if (unackedCount > 0)
+        {
+            getQpidSession().messageAcknowledge
+                (unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
+            clearUnacked();
+        }
     }
 
     /**
@@ -210,6 +242,7 @@
      */
     public void sendClose(long timeout) throws AMQException, FailoverException
     {
+        flushAcknowledgments();
         getQpidSession().sync();
         getQpidSession().close();
         getCurrentException();
@@ -243,15 +276,16 @@
      * @param durable    If set when creating a new queue,
      *                   the queue will be marked as durable.
      * @param exclusive  Exclusive queues can only be used from one connection at a time.
+     * @param arguments  Exclusive queues can only be used from one connection at a time.
      * @throws AMQException
      * @throws FailoverException
      */
     public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable,
-                                final boolean exclusive) throws AMQException, FailoverException
+                                final boolean exclusive, Map<String, Object> arguments) throws AMQException, FailoverException
     {
-        getQpidSession().queueDeclare(name.toString(), null, null, durable ? Option.DURABLE : Option.NO_OPTION,
-                                      autoDelete ? Option.AUTO_DELETE : Option.NO_OPTION,
-                                      exclusive ? Option.EXCLUSIVE : Option.NO_OPTION);
+        getQpidSession().queueDeclare(name.toString(), null, arguments, durable ? Option.DURABLE : Option.NONE,
+                                      autoDelete ? Option.AUTO_DELETE : Option.NONE,
+                                      exclusive ? Option.EXCLUSIVE : Option.NONE);
         // We need to sync so that we get notify of an error.
         getQpidSession().sync();
         getCurrentException();
@@ -311,7 +345,7 @@
     /**
      * Create an 0_10 message consumer
      */
-    public BasicMessageConsumer createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
+    public BasicMessageConsumer_0_10 createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
                                                       final int prefetchLow, final boolean noLocal,
                                                       final boolean exclusive, String messageSelector,
                                                       final FieldTable ft, final boolean noConsume,
@@ -372,8 +406,8 @@
      * This method is invoked when a consumer is creted
      * Registers the consumer with the broker
      */
-    public void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler,
-                            boolean nowait, String messageSelector, AMQShortString tag)
+    public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler,
+                            boolean nowait, String messageSelector, int tag)
             throws AMQException, FailoverException
     {
         boolean preAcquire;
@@ -382,32 +416,34 @@
             preAcquire = ( ! consumer.isNoConsume()  &&
                     (consumer.getMessageSelector() == null || consumer.getMessageSelector().equals("")) )
                     || !(consumer.getDestination() instanceof AMQQueue);
-            getQpidSession().messageSubscribe(queueName.toString(), tag.toString(),
+            getQpidSession().messageSubscribe(queueName.toString(), String.valueOf(tag),
                                               getAcknowledgeMode() == NO_ACKNOWLEDGE ? Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED:Session.TRANSFER_CONFIRM_MODE_REQUIRED,
                                               preAcquire ? Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE : Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE,
-                                              new MessagePartListenerAdapter((BasicMessageConsumer_0_10) consumer), null,
-                                              consumer.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION);
+                                              (BasicMessageConsumer_0_10) consumer, null,
+                                              consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
         }
         catch (JMSException e)
         {
             throw new AMQException(AMQConstant.INTERNAL_ERROR, "problem when registering consumer", e);
         }
 
+        String consumerTag = ((BasicMessageConsumer_0_10)consumer).getConsumerTagString();
+
         if (! prefetch())
         {
-            getQpidSession().messageSetFlowMode(consumer.getConsumerTag().toString(), MessageFlowMode.CREDIT);
+            getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.CREDIT);
         }
         else
         {
-            getQpidSession().messageSetFlowMode(consumer.getConsumerTag().toString(), MessageFlowMode.WINDOW);
+            getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.WINDOW);
         }
-        getQpidSession().messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.BYTE, 0xFFFFFFFF);
+        getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF);
         // We need to sync so that we get notify of an error.
         // only if not immediat prefetch
         if(prefetch() && (consumer.isStrated() || _immediatePrefetch))
         {
             // set the flow
-            getQpidSession().messageFlow(consumer.getConsumerTag().toString(),
+            getQpidSession().messageFlow(consumerTag,
                                          MessageCreditUnit.MESSAGE,
                                          getAMQConnection().getMaxPrefetch());
         }
@@ -418,7 +454,7 @@
     /**
      * Create an 0_10 message producer
      */
-    public BasicMessageProducer createMessageProducer(final Destination destination, final boolean mandatory,
+    public BasicMessageProducer_0_10 createMessageProducer(final Destination destination, final boolean mandatory,
                                                       final boolean immediate, final boolean waitUntilSent,
                                                       long producerId)
     {
@@ -476,9 +512,9 @@
             arguments.put("no-local", true);
         }
         getQpidSession().queueDeclare(res.toString(), null, arguments,
-                                      amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NO_OPTION,
-                                      amqd.isDurable() ? Option.DURABLE : Option.NO_OPTION,
-                                      !amqd.isDurable() && amqd.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION);
+                                      amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
+                                      amqd.isDurable() ? Option.DURABLE : Option.NONE,
+                                      !amqd.isDurable() && amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
         // passive --> false
         // We need to sync so that we get notify of an error.
         getQpidSession().sync();
@@ -508,13 +544,14 @@
         {
             for (BasicMessageConsumer consumer : _consumers.values())
             {
-                getQpidSession().messageStop(consumer.getConsumerTag().toString());
+                getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()));
             }
         }
         else
         {
-            for (BasicMessageConsumer consumer : _consumers.values())
+            for (BasicMessageConsumer_0_10 consumer : _consumers.values())
             {
+                String consumerTag = String.valueOf(consumer.getConsumerTag());
                 //only set if msg list is null
                 try
                 {
@@ -522,18 +559,18 @@
                     {
                         if (consumer.getMessageListener() != null)
                         {
-                            getQpidSession().messageFlow(consumer.getConsumerTag().toString(),
+                            getQpidSession().messageFlow(consumerTag,
                                                          MessageCreditUnit.MESSAGE, 1);
                         }
                     }
                     else
                     {
                         getQpidSession()
-                            .messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.MESSAGE,
+                            .messageFlow(consumerTag, MessageCreditUnit.MESSAGE,
                                          getAMQConnection().getMaxPrefetch());
                     }
                     getQpidSession()
-                        .messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.BYTE, 0xFFFFFFFF);
+                        .messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF);
                 }
                 catch (Exception e)
                 {
@@ -561,7 +598,7 @@
      *
      * @return The associated Qpid Session.
      */
-    protected org.apache.qpidity.nclient.Session getQpidSession()
+    protected org.apache.qpid.nclient.Session getQpidSession()
     {
         return _qpidSession;
     }
@@ -594,7 +631,7 @@
         try
         {
             // this is done so that we can produce to a temporary queue beofre we create a consumer
-            sendCreateQueue(result.getRoutingKey(), result.isAutoDelete(), result.isDurable(), result.isExclusive());
+            sendCreateQueue(result.getRoutingKey(), result.isAutoDelete(), result.isDurable(), result.isExclusive(),null);
             sendQueueBind(result.getRoutingKey(), result.getRoutingKey(), new FieldTable(), result.getExchangeName(),result);
             result.setQueueName(result.getRoutingKey());
         }
@@ -612,7 +649,7 @@
     /**
      * Lstener for qpid protocol exceptions
      */
-    private class QpidSessionExceptionListener implements org.apache.qpidity.nclient.ClosedListener
+    private class QpidSessionExceptionListener implements org.apache.qpid.nclient.ClosedListener
     {
         public void onClosed(ErrorCode errorCode, String reason, Throwable t)
         {
@@ -681,7 +718,7 @@
         AMQTopic origTopic=checkValidTopic(topic);
         AMQTopic dest=AMQTopic.createDurable010Topic(origTopic, name, _connection);
 
-        TopicSubscriberAdaptor subscriber=_subscriptions.get(name);
+        TopicSubscriberAdaptor<BasicMessageConsumer_0_10> subscriber=_subscriptions.get(name);
         if (subscriber != null)
         {
             if (subscriber.getTopic().equals(topic))
@@ -732,7 +769,7 @@
             }
         }
 
-        subscriber=new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest));
+        subscriber=new TopicSubscriberAdaptor(dest, createExclusiveConsumer(dest));
 
         _subscriptions.put(name, subscriber);
         _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
@@ -740,7 +777,7 @@
         return subscriber;
     }
 
-    Long requestQueueDepth(AMQDestination amqd)
+    protected Long requestQueueDepth(AMQDestination amqd)
     {
         return getQpidSession().queueQuery(amqd.getQueueName()).get().getMessageCount();
     }
@@ -757,10 +794,11 @@
         _txRangeSet.add((int) id);
         _txSize++;
         // this is a heuristic, we may want to have that configurable 
-        if( _txSize > _connection.getMaxPrefetch() / 2 )
+        if (_connection.getMaxPrefetch() == 1 ||
+                _connection.getMaxPrefetch() != 0 && _txSize % (_connection.getMaxPrefetch() / 2) == 0)
         {
-           // send completed so consumer credits don't dry up
-           getQpidSession().messageAcknowledge(_txRangeSet, false);
+            // send completed so consumer credits don't dry up
+            getQpidSession().messageAcknowledge(_txRangeSet, false);
         }
     }
 
@@ -787,14 +825,19 @@
         }
     }
 
-    final boolean tagLE(long tag1, long tag2)
+    protected final boolean tagLE(long tag1, long tag2)
     {
         return Serial.le((int) tag1, (int) tag2);
     }
 
-    final boolean updateRollbackMark(long currentMark, long deliveryTag)
+    protected final boolean updateRollbackMark(long currentMark, long deliveryTag)
     {
         return Serial.lt((int) currentMark, (int) deliveryTag);
     }
 
+    public AMQMessageDelegateFactory getMessageDelegateFactory()
+    {
+        return AMQMessageDelegateFactory.FACTORY_0_10;
+    }
+
 }