You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/06/15 18:28:54 UTC

svn commit: r547730 [2/9] - in /incubator/qpid/trunk/qpid: ./ java/ java/broker/ java/broker/src/main/java/org/apache/qpid/server/ java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ java/broker/src/main/java/org/apache/qpid/server/queue/ ...

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=547730&r1=547729&r2=547730
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Fri Jun 15 09:28:46 2007
@@ -20,47 +20,15 @@
  */
 package org.apache.qpid.client;
 
-import java.io.Serializable;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Arrays;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.BytesMessage;
-import javax.jms.Destination;
-import javax.jms.IllegalStateException;
-import javax.jms.InvalidDestinationException;
-import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Queue;
-import javax.jms.QueueBrowser;
-import javax.jms.QueueReceiver;
-import javax.jms.QueueSender;
-import javax.jms.QueueSession;
-import javax.jms.StreamMessage;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.TopicPublisher;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
-
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQUndeliveredException;
-import org.apache.qpid.AMQInvalidRoutingKeyException;
 import org.apache.qpid.AMQInvalidArgumentException;
-import org.apache.qpid.client.failover.FailoverSupport;
+import org.apache.qpid.AMQInvalidRoutingKeyException;
+import org.apache.qpid.AMQUndeliveredException;
+import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.failover.FailoverNoopSupport;
+import org.apache.qpid.client.failover.FailoverProtectedOperation;
+import org.apache.qpid.client.failover.FailoverRetrySupport;
 import org.apache.qpid.client.message.AbstractJMSMessage;
 import org.apache.qpid.client.message.JMSBytesMessage;
 import org.apache.qpid.client.message.JMSMapMessage;
@@ -70,21 +38,20 @@
 import org.apache.qpid.client.message.MessageFactoryRegistry;
 import org.apache.qpid.client.message.UnprocessedMessage;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.client.protocol.BlockingMethodFrameListener;
 import org.apache.qpid.client.util.FlowControllingBlockingQueue;
 import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.AccessRequestBody;
-import org.apache.qpid.framing.AccessRequestOkBody;
 import org.apache.qpid.framing.BasicAckBody;
 import org.apache.qpid.framing.BasicConsumeBody;
 import org.apache.qpid.framing.BasicConsumeOkBody;
 import org.apache.qpid.framing.BasicRecoverBody;
+import org.apache.qpid.framing.BasicRecoverOkBody;
+import org.apache.qpid.framing.BasicRejectBody;
 import org.apache.qpid.framing.ChannelCloseBody;
 import org.apache.qpid.framing.ChannelCloseOkBody;
 import org.apache.qpid.framing.ChannelFlowBody;
+import org.apache.qpid.framing.ChannelFlowOkBody;
 import org.apache.qpid.framing.ExchangeBoundBody;
 import org.apache.qpid.framing.ExchangeBoundOkBody;
 import org.apache.qpid.framing.ExchangeDeclareBody;
@@ -92,413 +59,618 @@
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.FieldTableFactory;
 import org.apache.qpid.framing.QueueBindBody;
+import org.apache.qpid.framing.QueueBindOkBody;
 import org.apache.qpid.framing.QueueDeclareBody;
+import org.apache.qpid.framing.QueueDeclareOkBody;
 import org.apache.qpid.framing.QueueDeleteBody;
 import org.apache.qpid.framing.QueueDeleteOkBody;
 import org.apache.qpid.framing.TxCommitBody;
 import org.apache.qpid.framing.TxCommitOkBody;
 import org.apache.qpid.framing.TxRollbackBody;
 import org.apache.qpid.framing.TxRollbackOkBody;
-import org.apache.qpid.framing.QueueBindOkBody;
-import org.apache.qpid.framing.QueueDeclareOkBody;
-import org.apache.qpid.framing.ChannelFlowOkBody;
-import org.apache.qpid.framing.BasicRecoverOkBody;
-import org.apache.qpid.framing.BasicRejectBody;
 import org.apache.qpid.jms.Session;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.url.AMQBindingURL;
 import org.apache.qpid.url.URLSyntaxException;
 
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import java.io.Serializable;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ */
 public class AMQSession extends Closeable implements Session, QueueSession, TopicSession
 {
+    /** Used for debugging. */
     private static final Logger _logger = Logger.getLogger(AMQSession.class);
 
+    /** Used for debugging in the dispatcher. */
+    private static final Logger _dispatcherLogger = Logger.getLogger(Dispatcher.class);
+
+    /** The default maximum number of prefetched message at which to suspend the channel. */
     public static final int DEFAULT_PREFETCH_HIGH_MARK = 5000;
+
+    /** The default minimum number of prefetched messages at which to resume the channel. */
     public static final int DEFAULT_PREFETCH_LOW_MARK = 2500;
 
+    /**
+     * The default value for immediate flag used by producers created by this session is false. That is, a consumer does
+     * not need to be attached to a queue.
+     */
+    protected static final boolean DEFAULT_IMMEDIATE = false;
+
+    /**
+     * The default value for mandatory flag used by producers created by this session is true. That is, server will not
+     * silently drop messages where no queue is connected to the exchange for the message.
+     */
+    protected static final boolean DEFAULT_MANDATORY = true;
+
+    /** System property to enable strict AMQP compliance. */
+    public static final String STRICT_AMQP = "STRICT_AMQP";
+
+    /** Strict AMQP default setting. */
+    public static final String STRICT_AMQP_DEFAULT = "false";
+
+    /** System property to enable failure if strict AMQP compliance is violated. */
+    public static final String STRICT_AMQP_FATAL = "STRICT_AMQP_FATAL";
+
+    /** Strickt AMQP failure default. */
+    public static final String STRICT_AMQP_FATAL_DEFAULT = "true";
+
+    /** System property to enable immediate message prefetching. */
+    public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH";
+
+    /** Immediate message prefetch default. */
+    public static final String IMMEDIATE_PREFETCH_DEFAULT = "false";
+
+    /** The connection to which this session belongs. */
     private AMQConnection _connection;
 
+    /** Used to indicate whether or not this is a transactional session. */
     private boolean _transacted;
 
+    /** Holds the sessions acknowledgement mode. */
     private int _acknowledgeMode;
 
+    /** Holds this session unique identifier, used to distinguish it from other sessions. */
     private int _channelId;
 
+    /** @todo This does not appear to be set? */
     private int _ticket;
 
+    /** Holds the high mark for prefetched message, at which the session is suspended. */
     private int _defaultPrefetchHighMark = DEFAULT_PREFETCH_HIGH_MARK;
+
+    /** Holds the low mark for prefetched messages, below which the session is resumed. */
     private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK;
 
+    /** Holds the message listener, if any, which is attached to this session. */
     private MessageListener _messageListener = null;
 
+    /** Used to indicate that this session has been started at least once. */
     private AtomicBoolean _startedAtLeastOnce = new AtomicBoolean(false);
 
     /**
-     * Used to reference durable subscribers so they requests for unsubscribe can be handled correctly.  Note this only
-     * keeps a record of subscriptions which have been created in the current instance.  It does not remember
-     * subscriptions between executions of the client
+     * Used to reference durable subscribers so that requests for unsubscribe can be handled correctly.  Note this only
+     * keeps a record of subscriptions which have been created in the current instance. It does not remember
+     * subscriptions between executions of the client.
      */
     private final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions =
             new ConcurrentHashMap<String, TopicSubscriberAdaptor>();
+
+    /**
+     * Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked up
+     * in the {@link #_subscriptions} map.
+     */
     private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap =
             new ConcurrentHashMap<BasicMessageConsumer, String>();
 
-    /** Used in the consume method. We generate the consume tag on the client so that we can use the nowait feature. */
-    private int _nextTag = 1;
-
-    /** This queue is bounded and is used to store messages before being dispatched to the consumer */
+    /**
+     * Used to hold incoming messages.
+     *
+     * @todo Weaken the type once {@link FlowControllingBlockingQueue} implements Queue.
+     */
     private final FlowControllingBlockingQueue _queue;
 
+    /** Holds the dispatcher thread for this session. */
     private Dispatcher _dispatcher;
 
+    /** Holds the message factory factory for this session. */
     private MessageFactoryRegistry _messageFactoryRegistry;
 
-    /** Set of all producers created by this session */
-    private Map _producers = new ConcurrentHashMap();
-
-    /** Maps from consumer tag (String) to JMSMessageConsumer instance */
-    private Map<AMQShortString, BasicMessageConsumer> _consumers = new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
-
-    /** Maps from destination to count of JMSMessageConsumers */
-    private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount =
-            new ConcurrentHashMap<Destination, AtomicInteger>();
+    /** Holds all of the producers created by this session, keyed by their unique identifiers. */
+    private Map<Long, MessageProducer> _producers = new ConcurrentHashMap<Long, MessageProducer>();
 
     /**
-     * Default value for immediate flag used by producers created by this session is false, i.e. a consumer does not
-     * need to be attached to a queue
+     * Used as a source of unique identifiers so that the consumers can be tagged to match them to BasicConsume
+     * methods.
      */
-    protected static final boolean DEFAULT_IMMEDIATE = false;
+    private int _nextTag = 1;
 
     /**
-     * Default value for mandatory flag used by producers created by this sessio is true, i.e. server will not silently
-     * drop messages where no queue is connected to the exchange for the message
+     * Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right
+     * consumer.
      */
-    protected static final boolean DEFAULT_MANDATORY = true;
+    private Map<AMQShortString, BasicMessageConsumer> _consumers =
+            new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
+
+    /** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */
+    private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount =
+            new ConcurrentHashMap<Destination, AtomicInteger>();
 
     /**
-     * The counter of the next producer id. This id is generated by the session and used only to allow the producer to
-     * identify itself to the session when deregistering itself. <p/> Access to this id does not require to be
-     * synchronized since according to the JMS specification only one thread of control is allowed to create producers
-     * for any given session instance.
+     * Used as a source of unique identifiers for producers within the session.
+     *
+     * <p/> Access to this id does not require to be synchronized since according to the JMS specification only one
+     * thread of control is allowed to create producers for any given session instance.
      */
     private long _nextProducerId;
 
-
     /**
      * Set when recover is called. This is to handle the case where recover() is called by application code during
-     * onMessage() processing. We need to make sure we do not send an auto ack if recover was called.
+     * onMessage() processing to enure that an auto ack is not sent.
      */
     private boolean _inRecovery;
 
+    /** Used to indicates that the connection to which this session belongs, has been stopped. */
     private boolean _connectionStopped;
 
+    /** Used to indicate that this session has a message listener attached to it. */
     private boolean _hasMessageListeners;
 
+    /** Used to indicate that this session has been suspended. */
     private boolean _suspended;
 
+    /**
+     * Used to protect the suspension of this session, so that critical code can be executed during suspension, without
+     * the session being resumed by other threads.
+     */
     private final Object _suspensionLock = new Object();
 
-    /** Boolean to control immediate prefetch . Records the first call to the dispatcher to prevent further flow(true) */
+    /**
+     * Used to ensure that onlt the first call to start the dispatcher can unsuspend the channel.
+     *
+     * @todo This is accessed only within a synchronized method, so does not need to be atomic.
+     */
     private final AtomicBoolean _firstDispatcher = new AtomicBoolean(true);
 
-    /** System property to enable strickt AMQP compliance */
-    public static final String STRICT_AMQP = "STRICT_AMQP";
-    /** Strickt AMQP default */
-    public static final String STRICT_AMQP_DEFAULT = "false";
+    /** Used to indicate that the session should start pre-fetching messages as soon as it is started. */
+    private final boolean _immediatePrefetch;
 
+    /** Indicates that warnings should be generated on violations of the strict AMQP. */
     private final boolean _strictAMQP;
 
-    /** System property to enable strickt AMQP compliance */
-    public static final String STRICT_AMQP_FATAL = "STRICT_AMQP_FATAL";
-    /** Strickt AMQP default */
-    public static final String STRICT_AMQP_FATAL_DEFAULT = "true";
-
+    /** Indicates that runtime exceptions should be generated on vilations of the strict AMQP. */
     private final boolean _strictAMQPFATAL;
 
-    /** System property to enable immediate message prefetching */
-    public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH";
-    /** Immediate message prefetch default */
-    public static final String IMMEDIATE_PREFETCH_DEFAULT = "false";
+    /**
+     * Creates a new session on a connection.
+     *
+     * @param con                     The connection on which to create the session.
+     * @param channelId               The unique identifier for the session.
+     * @param transacted              Indicates whether or not the session is transactional.
+     * @param acknowledgeMode         The acknoledgement mode for the session.
+     * @param messageFactoryRegistry  The message factory factory for the session.
+     * @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session.
+     * @param defaultPrefetchLowMark  The number of prefetched messages at which to resume the session.
+     */
+    AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
+               MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
+    {
 
-    private final boolean _immediatePrefetch;
+        _strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT));
+        _strictAMQPFATAL =
+                Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT));
+        _immediatePrefetch =
+                _strictAMQP
+                || Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT));
 
-    private static final Logger _dispatcherLogger = Logger.getLogger(Dispatcher.class);
+        _connection = con;
+        _transacted = transacted;
+        if (transacted)
+        {
+            _acknowledgeMode = javax.jms.Session.SESSION_TRANSACTED;
+        }
+        else
+        {
+            _acknowledgeMode = acknowledgeMode;
+        }
 
-    /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
-    private class Dispatcher extends Thread
-    {
+        _channelId = channelId;
+        _messageFactoryRegistry = messageFactoryRegistry;
+        _defaultPrefetchHighMark = defaultPrefetchHighMark;
+        _defaultPrefetchLowMark = defaultPrefetchLowMark;
 
-        /** Track the 'stopped' state of the dispatcher, a session starts in the stopped state. */
-        private final AtomicBoolean _closed = new AtomicBoolean(false);
+        if (_acknowledgeMode == NO_ACKNOWLEDGE)
+        {
+            _queue =
+                    new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
+                                                     new FlowControllingBlockingQueue.ThresholdListener()
+                                                     {
+                                                         public void aboveThreshold(int currentValue)
+                                                         {
+                                                             if (_acknowledgeMode == NO_ACKNOWLEDGE)
+                                                             {
+                                                                 _logger.debug(
+                                                                         "Above threshold(" + _defaultPrefetchHighMark
+                                                                         + ") so suspending channel. Current value is " + currentValue);
+                                                                 new Thread(new SuspenderRunner(true)).start();
+                                                             }
+                                                         }
+
+                                                         public void underThreshold(int currentValue)
+                                                         {
+                                                             if (_acknowledgeMode == NO_ACKNOWLEDGE)
+                                                             {
+                                                                 _logger.debug(
+                                                                         "Below threshold(" + _defaultPrefetchLowMark
+                                                                         + ") so unsuspending channel. Current value is " + currentValue);
+                                                                 new Thread(new SuspenderRunner(false)).start();
+                                                             }
+                                                         }
+                                                     });
+        }
+        else
+        {
+            _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, null);
+        }
+    }
 
-        private final Object _lock = new Object();
+    /**
+     * Creates a new session on a connection with the default message factory factory.
+     *
+     * @param con                 The connection on which to create the session.
+     * @param channelId           The unique identifier for the session.
+     * @param transacted          Indicates whether or not the session is transactional.
+     * @param acknowledgeMode     The acknoledgement mode for the session.
+     * @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session.
+     * @param defaultPrefetchLow  The number of prefetched messages at which to resume the session.
+     */
+    AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh,
+               int defaultPrefetchLow)
+    {
+        this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh,
+             defaultPrefetchLow);
+    }
 
-        public Dispatcher()
+    /**
+     * Acknowledges all unacknowledged messages on the session, for all message consumers on the session.
+     *
+     * @throws IllegalStateException If the session is closed.
+     */
+    public void acknowledge() throws IllegalStateException
+    {
+        if (isClosed())
         {
-            super("Dispatcher-Channel-" + _channelId);
-            if (_dispatcherLogger.isInfoEnabled())
-            {
-                _dispatcherLogger.info(getName() + " created");
-            }
+            throw new IllegalStateException("Session is already closed");
         }
 
-        public void run()
+        for (BasicMessageConsumer consumer : _consumers.values())
         {
-            if (_dispatcherLogger.isInfoEnabled())
-            {
-                _dispatcherLogger.info(getName() + " started");
-            }
+            consumer.acknowledge();
+        }
+    }
 
-            UnprocessedMessage message;
+    /**
+     * Acknowledge one or many messages.
+     *
+     * @param deliveryTag The tag of the last message to be acknowledged.
+     * @param multiple    <tt>true</tt> to acknowledge all messages up to and including the one specified by the
+     *                    delivery tag, <tt>false</tt> to just acknowledge that message.
+     *
+     * @todo Be aware of possible changes to parameter order as versions change.
+     */
+    public void acknowledgeMessage(long deliveryTag, boolean multiple)
+    {
+        final AMQFrame ackFrame =
+                BasicAckBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag,
+                                            multiple);
 
-            // Allow disptacher to start stopped
-            synchronized (_lock)
-            {
-                while (connectionStopped())
-                {
-                    try
-                    {
-                        _lock.wait();
-                    }
-                    catch (InterruptedException e)
-                    {
-                        // ignore
-                    }
-                }
-            }
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);
+        }
 
-            try
-            {
-                while (!_closed.get() && (message = (UnprocessedMessage) _queue.take()) != null)
-                {
-                    synchronized (_lock)
-                    {
+        getProtocolHandler().writeFrame(ackFrame);
+    }
 
-                        while (connectionStopped())
-                        {
-                            _lock.wait();
-                        }
+    /**
+     * Binds the named queue, with the specified routing key, to the named exchange.
+     *
+     * <p/>Note that this operation automatically retries in the event of fail-over.
+     *
+     * @param queueName    The name of the queue to bind.
+     * @param routingKey   The routing key to bind the queue with.
+     * @param arguments    Additional arguments.
+     * @param exchangeName The exchange to bind the queue on.
+     *
+     * @throws AMQException If the queue cannot be bound for any reason.
+     * @todo Be aware of possible changes to parameter order as versions change.
+     * @todo Document the additional arguments that may be passed in the field table. Are these for headers exchanges?
+     */
+    public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
+                          final AMQShortString exchangeName) throws AMQException
+    {
+        /*new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()*/
+        new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
+        {
+            public Object execute() throws AMQException, FailoverException
+            {
+                AMQFrame queueBind =
+                        QueueBindBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
+                                                     arguments, // arguments
+                                                     exchangeName, // exchange
+                                                     false, // nowait
+                                                     queueName, // queue
+                                                     routingKey, // routingKey
+                                                     getTicket()); // ticket
 
-                        dispatchMessage(message);
+                getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class);
 
-                        while (connectionStopped())
-                        {
-                            _lock.wait();
-                        }
+                return null;
+            }
+        }, _connection).execute();
+    }
 
-                    }
-
-                }
-            }
-            catch (InterruptedException e)
-            {
-                //ignore
-            }
-            if (_dispatcherLogger.isInfoEnabled())
-            {
-                _dispatcherLogger.info(getName() + " thread terminating for channel " + _channelId);
-            }
-        }
+    /**
+     * Closes the session with no timeout.
+     *
+     * @throws JMSException If the JMS provider fails to close the session due to some internal error.
+     */
+    public void close() throws JMSException
+    {
+        close(-1);
+    }
 
-        // only call while holding lock
-        final boolean connectionStopped()
+    /**
+     * Closes the session.
+     *
+     * <p/>Note that this operation succeeds automatically if a fail-over interupts the sycnronous request to close the
+     * channel. This is because the channel is marked as closed before the request to close it is made, so the fail-over
+     * should not re-open it.
+     *
+     * @param timeout The timeout in milliseconds to wait for the session close acknoledgement from the broker.
+     *
+     * @throws JMSException If the JMS provider fails to close the session due to some internal error.
+     * @todo Be aware of possible changes to parameter order as versions change.
+     * @todo Not certain about the logic of ignoring the failover exception, because the channel won't be re-opened. May
+     * need to examine this more carefully.
+     * @todo Note that taking the failover mutex doesn't prevent this operation being interrupted by a failover, because
+     * the failover process sends the failover event before acquiring the mutex itself.
+     */
+    public void close(long timeout) throws JMSException
+    {
+        if (_logger.isInfoEnabled())
         {
-            return _connectionStopped;
+            _logger.info("Closing session: " + this + ":"
+                         + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
         }
 
-        boolean setConnectionStopped(boolean connectionStopped)
+        // We must close down all producers and consumers in an orderly fashion. This is the only method
+        // that can be called from a different thread of control from the one controlling the session.
+        synchronized (_connection.getFailoverMutex())
         {
-            boolean currently;
-            synchronized (_lock)
+            // Ensure we only try and close an open session.
+            if (!_closed.getAndSet(true))
             {
-                currently = _connectionStopped;
-                _connectionStopped = connectionStopped;
-                _lock.notify();
+                // we pass null since this is not an error case
+                closeProducersAndConsumers(null);
 
-                if (_dispatcherLogger.isDebugEnabled())
+                try
                 {
-                    _dispatcherLogger.debug("Set Dispatcher Connection " + (connectionStopped ? "Stopped" : "Started") +
-                                            ": Currently " + (currently ? "Stopped" : "Started"));
-                }
-            }
-            return currently;
-        }
 
-        private void dispatchMessage(UnprocessedMessage message)
-        {
-            if (message.getDeliverBody() != null)
-            {
-                final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.getDeliverBody().consumerTag);
+                    getProtocolHandler().closeSession(this);
+
+                    final AMQFrame frame =
+                            ChannelCloseBody.createAMQFrame(getChannelId(), getProtocolMajorVersion(), getProtocolMinorVersion(),
+                                                            0, // classId
+                                                            0, // methodId
+                                                            AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+                                                            new AMQShortString("JMS client closing channel")); // replyText
+
+                    getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout);
 
-                if (consumer == null || consumer.isClosed())
+                    // When control resumes at this point, a reply will have been received that
+                    // indicates the broker has closed the channel successfully.
+                }
+                catch (AMQException e)
                 {
-                    if (_dispatcherLogger.isInfoEnabled())
-                    {
-                        if (consumer == null)
-                        {
-                            _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" +
-                                                   "[" + message.getDeliverBody().deliveryTag + "] from queue " +
-                                                   message.getDeliverBody().consumerTag +
-                                                   " )without a handler - rejecting(requeue)...");
-                        }
-                        else
-                        {
-                            _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" +
-                                                   "[" + message.getDeliverBody().deliveryTag + "] from queue " +
-                                                   " consumer(" + consumer.debugIdentity() +
-                                                   ") is closed rejecting(requeue)...");
-                        }
-                    }
-                    // Don't reject if we're already closing
-                    if (!_closed.get())
-                    {
-                        rejectMessage(message, true);
-                    }
+                    JMSException jmse = new JMSException("Error closing session: " + e);
+                    jmse.setLinkedException(e);
+                    throw jmse;
                 }
-                else
+                // This is ignored because the channel is already marked as closed so the fail-over process will
+                // not re-open it.
+                catch (FailoverException e)
                 {
-                    consumer.notifyMessage(message, _channelId);
+                    _logger.debug(
+                            "Got FailoverException during channel close, ignored as channel already marked as closed.");
+                }
+                finally
+                {
+                    _connection.deregisterSession(_channelId);
                 }
             }
         }
+    }
 
-        public void close()
+    /**
+     * Called when the server initiates the closure of the session unilaterally.
+     *
+     * @param e the exception that caused this session to be closed. Null causes the
+     */
+    public void closed(Throwable e) throws JMSException
+    {
+        synchronized (_connection.getFailoverMutex())
         {
+            // An AMQException has an error code and message already and will be passed in when closure occurs as a
+            // result of a channel close request
             _closed.set(true);
-            interrupt();
-
-            //fixme awaitTermination
+            AMQException amqe;
+            if (e instanceof AMQException)
+            {
+                amqe = (AMQException) e;
+            }
+            else
+            {
+                amqe = new AMQException(null, "Closing session forcibly", e);
+            }
 
+            _connection.deregisterSession(_channelId);
+            closeProducersAndConsumers(amqe);
         }
+    }
 
-        public void rollback()
-        {
+    /**
+     * Commits all messages done in this transaction and releases any locks currently held.
+     *
+     * <p/>If the commit fails, because the commit itself is interrupted by a fail-over between requesting that the
+     * commit be done, and receiving an acknowledgement that it has been done, then a JMSException will be thrown. The
+     * client will be unable to determine whether or not the commit actually happened on the broker in this case.
+     *
+     * @throws JMSException If the JMS provider fails to commit the transaction due to some internal error. This does
+     *                      not mean that the commit is known to have failed, merely that it is not known whether it
+     *                      failed or not.
+     * @todo Be aware of possible changes to parameter order as versions change.
+     */
+    public void commit() throws JMSException
+    {
+        checkTransacted();
 
-            synchronized (_lock)
+        try
+        {
+            // Acknowledge up to message last delivered (if any) for each consumer.
+            // need to send ack for messages delivered to consumers so far
+            for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
             {
-                boolean isStopped = connectionStopped();
-
-                if (!isStopped)
-                {
-                    setConnectionStopped(true);
-                }
-
-                rejectAllMessages(true);
-
-                _dispatcherLogger.debug("Session Pre Dispatch Queue cleared");
-
-                for (BasicMessageConsumer consumer : _consumers.values())
-                {
-                    if (!consumer.isNoConsume())
-                    {
-                        consumer.rollback();
-                    }
-                    else
-                    {
-                        // should perhaps clear the _SQ here.
-                        //consumer._synchronousQueue.clear();
-                        consumer.clearReceiveQueue();
-                    }
-
-
-                }
-
-                setConnectionStopped(isStopped);
+                // Sends acknowledgement to server
+                i.next().acknowledgeLastDelivered();
             }
 
+            // Commits outstanding messages sent and outstanding acknowledgements.
+            final AMQProtocolHandler handler = getProtocolHandler();
+
+            handler.syncWrite(TxCommitBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()),
+                              TxCommitOkBody.class);
+        }
+        catch (AMQException e)
+        {
+            throw new JMSAMQException("Failed to commit: " + e.getMessage(), e);
         }
+        catch (FailoverException e)
+        {
+            throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e);
+        }
+    }
 
-        public void rejectPending(BasicMessageConsumer consumer)
+    public void confirmConsumerCancelled(AMQShortString consumerTag)
+    {
+
+        // Remove the consumer from the map
+        BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag);
+        if (consumer != null)
         {
-            synchronized (_lock)
+            // fixme this isn't right.. needs to check if _queue contains data for this consumer
+            if (consumer.isAutoClose()) // && _queue.isEmpty())
             {
-                boolean stopped = _dispatcher.connectionStopped();
+                consumer.closeWhenNoMessages(true);
+            }
 
-                if (!stopped)
+            if (!consumer.isNoConsume())
+            {
+                // Clean the Maps up first
+                // Flush any pending messages for this consumerTag
+                if (_dispatcher != null)
                 {
-                    _dispatcher.setConnectionStopped(true);
+                    _logger.info("Dispatcher is not null");
                 }
+                else
+                {
+                    _logger.info("Dispatcher is null so created stopped dispatcher");
 
-                // Reject messages on pre-receive queue
-                consumer.rollback();
-
-                // Reject messages on pre-dispatch queue
-                rejectMessagesForConsumerTag(consumer.getConsumerTag(), true);
-
-                // closeConsumer
-                consumer.markClosed();
-
-                _dispatcher.setConnectionStopped(stopped);
+                    startDistpatcherIfNecessary(true);
+                }
 
+                _dispatcher.rejectPending(consumer);
             }
-        }
-    }
-
-
-
-    AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
-               MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
-    {
-
-        _strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT));
-        _strictAMQPFATAL = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT));
-        _immediatePrefetch = _strictAMQP || Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT));
+            else
+            {
+                // Just close the consumer
+                // fixme  the CancelOK is being processed before the arriving messages..
+                // The dispatcher is still to process them so the server sent in order but the client
+                // has yet to receive before the close comes in.
 
-        _connection = con;
-        _transacted = transacted;
-        if (transacted)
-        {
-            _acknowledgeMode = javax.jms.Session.SESSION_TRANSACTED;
+                // consumer.markClosed();
+            }
         }
         else
         {
-            _acknowledgeMode = acknowledgeMode;
+            _logger.warn("Unable to confirm cancellation of consumer (" + consumerTag + "). Not found in consumer map.");
         }
-        _channelId = channelId;
-        _messageFactoryRegistry = messageFactoryRegistry;
-        _defaultPrefetchHighMark = defaultPrefetchHighMark;
-        _defaultPrefetchLowMark = defaultPrefetchLowMark;
 
-        if (_acknowledgeMode == NO_ACKNOWLEDGE)
+    }
+
+    public QueueBrowser createBrowser(Queue queue) throws JMSException
+    {
+        if (isStrictAMQP())
         {
-            _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
-                                                      new FlowControllingBlockingQueue.ThresholdListener()
-                                                      {
-                                                          public void aboveThreshold(int currentValue)
-                                                          {
-                                                              if (_acknowledgeMode == NO_ACKNOWLEDGE)
-                                                              {
-                                                                  _logger.warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending channel. Current value is " + currentValue);
-                                                                  new Thread(new SuspenderRunner(true)).start();
-                                                              }
-                                                          }
-
-                                                          public void underThreshold(int currentValue)
-                                                          {
-                                                              if (_acknowledgeMode == NO_ACKNOWLEDGE)
-                                                              {
-                                                                  _logger.warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + currentValue);
-                                                                  new Thread(new SuspenderRunner(false)).start();
-                                                              }
-                                                          }
-                                                      });
+            throw new UnsupportedOperationException();
         }
-        else
+
+        return createBrowser(queue, null);
+    }
+
+    public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException
+    {
+        if (isStrictAMQP())
         {
-            _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, null);
+            throw new UnsupportedOperationException();
         }
-    }
 
+        checkNotClosed();
+        checkValidQueue(queue);
 
-    AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow)
-    {
-        this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow);
+        return new AMQQueueBrowser(this, (AMQQueue) queue, messageSelector);
     }
 
-    public AMQConnection getAMQConnection()
+    public MessageConsumer createBrowserConsumer(Destination destination, String messageSelector, boolean noLocal)
+            throws JMSException
     {
-        return _connection;
+        checkValidDestination(destination);
+
+        return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false,
+                                  messageSelector, null, true, true);
     }
 
     public BytesMessage createBytesMessage() throws JMSException
@@ -506,1506 +678,1548 @@
         synchronized (_connection.getFailoverMutex())
         {
             checkNotClosed();
+
             return new JMSBytesMessage();
         }
     }
 
-    public MapMessage createMapMessage() throws JMSException
+    public MessageConsumer createConsumer(Destination destination) throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
-        {
-            checkNotClosed();
-            return new JMSMapMessage();
-        }
+        checkValidDestination(destination);
+
+        return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, null, null,
+                                  false, false);
     }
 
-    public javax.jms.Message createMessage() throws JMSException
+    public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
     {
-        return createBytesMessage();
+        checkValidDestination(destination);
+
+        return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false,
+                                  messageSelector, null, false, false);
     }
 
-    public ObjectMessage createObjectMessage() throws JMSException
+    public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
+            throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
-        {
-            checkNotClosed();
-            return (ObjectMessage) new JMSObjectMessage();
-        }
-    }
+        checkValidDestination(destination);
 
-    public ObjectMessage createObjectMessage(Serializable object) throws JMSException
-    {
-        ObjectMessage msg = createObjectMessage();
-        msg.setObject(object);
-        return msg;
+        return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false,
+                                  messageSelector, null, false, false);
     }
 
-    public StreamMessage createStreamMessage() throws JMSException
+    public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
+                                          String selector) throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
-        {
-            checkNotClosed();
+        checkValidDestination(destination);
 
-            return new JMSStreamMessage();
-        }
+        return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null, false, false);
     }
 
-    public TextMessage createTextMessage() throws JMSException
+    public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
+                                          boolean exclusive, String selector) throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
-        {
-            checkNotClosed();
+        checkValidDestination(destination);
 
-            return new JMSTextMessage();
-        }
+        return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, false, false);
     }
 
-    public TextMessage createTextMessage(String text) throws JMSException
+    public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
+                                          String selector, FieldTable rawSelector) throws JMSException
     {
+        checkValidDestination(destination);
 
-        TextMessage msg = createTextMessage();
-        msg.setText(text);
-        return msg;
+        return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, rawSelector, false, false);
     }
 
-    public boolean getTransacted() throws JMSException
+    public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
+                                          boolean exclusive, String selector, FieldTable rawSelector) throws JMSException
     {
-        checkNotClosed();
-        return _transacted;
-    }
+        checkValidDestination(destination);
 
-    public int getAcknowledgeMode() throws JMSException
-    {
-        checkNotClosed();
-        return _acknowledgeMode;
+        return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, false,
+                                  false);
     }
 
-    public void commit() throws JMSException
+    public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
     {
-        checkTransacted();
-        try
+
+        checkNotClosed();
+        AMQTopic origTopic = checkValidTopic(topic);
+        AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
+        TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
+        if (subscriber != null)
         {
-            // Acknowledge up to message last delivered (if any) for each consumer.
-            //need to send ack for messages delivered to consumers so far
-            for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
+            if (subscriber.getTopic().equals(topic))
             {
-                //Sends acknowledgement to server
-                i.next().acknowledgeLastDelivered();
+                throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange "
+                                                + name);
+            }
+            else
+            {
+                unsubscribe(name);
             }
-
-            // Commits outstanding messages sent and outstanding acknowledgements.
-            // TODO: Be aware of possible changes to parameter order as versions change.
-            final AMQProtocolHandler handler = getProtocolHandler();
-
-            handler.syncWrite(TxCommitBody.createAMQFrame(_channelId,
-                                                          getProtocolMajorVersion(),
-                                                          getProtocolMinorVersion()),
-                              TxCommitOkBody.class);
-        }
-        catch (AMQException e)
-        {
-            JMSException exception = new JMSException("Failed to commit: " + e.getMessage());
-            exception.setLinkedException(e);
-            throw exception;
         }
-    }
-
-
-    public void rollback() throws JMSException
-    {
-        synchronized (_suspensionLock)
+        else
         {
-            checkTransacted();
-            try
+            AMQShortString topicName;
+            if (topic instanceof AMQTopic)
             {
-                // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-                // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-                // Be aware of possible changes to parameter order as versions change.
-
-                boolean isSuspended = isSuspended();
+                topicName = ((AMQTopic) topic).getDestinationName();
+            }
+            else
+            {
+                topicName = new AMQShortString(topic.getTopicName());
+            }
 
-                if (!isSuspended)
+            if (_strictAMQP)
+            {
+                if (_strictAMQPFATAL)
                 {
-                    suspendChannel(true);
+                    throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP.");
                 }
-
-                if (_dispatcher != null)
+                else
                 {
-                    _dispatcher.rollback();
+                    _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' "
+                                 + "for creation durableSubscriber. Requesting queue deletion regardless.");
                 }
 
-                _connection.getProtocolHandler().syncWrite(
-                        TxRollbackBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class);
-
-
-                if (!isSuspended)
-                {
-                    suspendChannel(false);
-                }
+                deleteQueue(dest.getAMQQueueName());
             }
-            catch (AMQException e)
+            else
             {
-                throw (JMSException) (new JMSException("Failed to rollback: " + e).initCause(e));
+                // if the queue is bound to the exchange but NOT for this topic, then the JMS spec
+                // says we must trash the subscription.
+                if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName())
+                    && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
+                {
+                    deleteQueue(dest.getAMQQueueName());
+                }
             }
         }
+
+        subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
+
+        _subscriptions.put(name, subscriber);
+        _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
+
+        return subscriber;
     }
 
-    public void close() throws JMSException
+    /** Note, currently this does not handle reuse of the same name with different topics correctly. */
+    public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
+            throws JMSException
     {
-        close(-1);
+        checkNotClosed();
+        checkValidTopic(topic);
+        AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
+        BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
+        TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer);
+        _subscriptions.put(name, subscriber);
+        _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
+
+        return subscriber;
     }
 
-    public void close(long timeout) throws JMSException
+    public MapMessage createMapMessage() throws JMSException
     {
-        if (_logger.isInfoEnabled())
+        synchronized (_connection.getFailoverMutex())
         {
-            _logger.info("Closing session: " + this + ":" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
+            checkNotClosed();
+
+            return new JMSMapMessage();
         }
+    }
 
-        // We must close down all producers and consumers in an orderly fashion. This is the only method
-        // that can be called from a different thread of control from the one controlling the session
+    public javax.jms.Message createMessage() throws JMSException
+    {
+        return createBytesMessage();
+    }
+
+    public ObjectMessage createObjectMessage() throws JMSException
+    {
         synchronized (_connection.getFailoverMutex())
         {
-            //Ensure we only try and close an open session.
-            if (!_closed.getAndSet(true))
-            {
-                // we pass null since this is not an error case
-                closeProducersAndConsumers(null);
-
-                try
-                {
+            checkNotClosed();
 
-                    getProtocolHandler().closeSession(this);
-                    // TODO: Be aware of possible changes to parameter order as versions change.
-                    final AMQFrame frame = ChannelCloseBody.createAMQFrame(getChannelId(),
-                                                                           getProtocolMajorVersion(), getProtocolMinorVersion(),    // AMQP version (major, minor)
-                                                                           0,    // classId
-                                                                           0,    // methodId
-                                                                           AMQConstant.REPLY_SUCCESS.getCode(),    // replyCode
-                                                                           new AMQShortString("JMS client closing channel"));    // replyText
+            return (ObjectMessage) new JMSObjectMessage();
+        }
+    }
 
-                    getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout);
-                    // When control resumes at this point, a reply will have been received that
-                    // indicates the broker has closed the channel successfully
+    public ObjectMessage createObjectMessage(Serializable object) throws JMSException
+    {
+        ObjectMessage msg = createObjectMessage();
+        msg.setObject(object);
 
-                }
-                catch (AMQException e)
-                {
-                    JMSException jmse = new JMSException("Error closing session: " + e);
-                    jmse.setLinkedException(e);
-                    throw jmse;
-                }
-                finally
-                {
-                    _connection.deregisterSession(_channelId);
-                }
-            }
-        }
+        return msg;
     }
 
-    private AMQProtocolHandler getProtocolHandler()
+    public BasicMessageProducer createProducer(Destination destination) throws JMSException
     {
-        return _connection.getProtocolHandler();
+        return createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE);
     }
 
+    public BasicMessageProducer createProducer(Destination destination, boolean immediate) throws JMSException
+    {
+        return createProducerImpl(destination, DEFAULT_MANDATORY, immediate);
+    }
 
-    private byte getProtocolMinorVersion()
+    public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate)
+            throws JMSException
     {
-        return getProtocolHandler().getProtocolMinorVersion();
+        return createProducerImpl(destination, mandatory, immediate);
     }
 
-    private byte getProtocolMajorVersion()
+    public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate,
+                                               boolean waitUntilSent) throws JMSException
     {
-        return getProtocolHandler().getProtocolMajorVersion();
+        return createProducerImpl(destination, mandatory, immediate, waitUntilSent);
     }
 
+    public TopicPublisher createPublisher(Topic topic) throws JMSException
+    {
+        checkNotClosed();
 
-    /**
-     * Close all producers or consumers. This is called either in the error case or when closing the session normally.
-     *
-     * @param amqe the exception, may be null to indicate no error has occurred
-     */
-    private void closeProducersAndConsumers(AMQException amqe) throws JMSException
+        return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic), topic);
+    }
+
+    public Queue createQueue(String queueName) throws JMSException
     {
-        JMSException jmse = null;
-        try
-        {
-            closeProducers();
-        }
-        catch (JMSException e)
-        {
-            _logger.error("Error closing session: " + e, e);
-            jmse = e;
-        }
-        try
+        checkNotClosed();
+        if (queueName.indexOf('/') == -1)
         {
-            closeConsumers(amqe);
+            return new AMQQueue(getDefaultQueueExchangeName(), new AMQShortString(queueName));
         }
-        catch (JMSException e)
+        else
         {
-            _logger.error("Error closing session: " + e, e);
-            if (jmse == null)
+            try
             {
-                jmse = e;
+                return new AMQQueue(new AMQBindingURL(queueName));
             }
-        }
-        if (jmse != null)
-        {
-            throw jmse;
-        }
-    }
-
+            catch (URLSyntaxException urlse)
+            {
+                JMSException jmse = new JMSException(urlse.getReason());
+                jmse.setLinkedException(urlse);
 
-    public boolean isSuspended()
-    {
-        return _suspended;
+                throw jmse;
+            }
+        }
     }
 
-
     /**
-     * Called when the server initiates the closure of the session unilaterally.
+     * Declares the named queue.
      *
-     * @param e the exception that caused this session to be closed. Null causes the
+     * <p/>Note that this operation automatically retries in the event of fail-over.
+     *
+     * @param name       The name of the queue to declare.
+     * @param autoDelete
+     * @param durable    Flag to indicate that the queue is durable.
+     * @param exclusive  Flag to indicate that the queue is exclusive to this client.
+     *
+     * @throws AMQException If the queue cannot be declared for any reason.
+     * @todo Be aware of possible changes to parameter order as versions change.
      */
-    public void closed(Throwable e) throws JMSException
+    public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable,
+                            final boolean exclusive) throws AMQException
     {
-        synchronized (_connection.getFailoverMutex())
+        new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
         {
-            // An AMQException has an error code and message already and will be passed in when closure occurs as a
-            // result of a channel close request
-            _closed.set(true);
-            AMQException amqe;
-            if (e instanceof AMQException)
-            {
-                amqe = (AMQException) e;
-            }
-            else
+            public Object execute() throws AMQException, FailoverException
             {
-                amqe = new AMQException(null, "Closing session forcibly", e);
+                AMQFrame queueDeclare =
+                        QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
+                                                        null, // arguments
+                                                        autoDelete, // autoDelete
+                                                        durable, // durable
+                                                        exclusive, // exclusive
+                                                        false, // nowait
+                                                        false, // passive
+                                                        name, // queue
+                                                        getTicket()); // ticket
+
+                getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
+
+                return null;
             }
-            _connection.deregisterSession(_channelId);
-            closeProducersAndConsumers(amqe);
-        }
+        }, _connection).execute();
     }
 
     /**
-     * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after failover
-     * when the client has veoted resubscription. <p/> The caller of this method must already hold the failover mutex.
+     * Creates a QueueReceiver
+     *
+     * @param destination
+     *
+     * @return QueueReceiver - a wrapper around our MessageConsumer
+     *
+     * @throws JMSException
      */
-    void markClosed()
+    public QueueReceiver createQueueReceiver(Destination destination) throws JMSException
     {
-        _closed.set(true);
-        _connection.deregisterSession(_channelId);
-        markClosedProducersAndConsumers();
+        checkValidDestination(destination);
+        AMQQueue dest = (AMQQueue) destination;
+        BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination);
 
+        return new QueueReceiverAdaptor(dest, consumer);
     }
 
-    private void markClosedProducersAndConsumers()
+    /**
+     * Creates a QueueReceiver using a message selector
+     *
+     * @param destination
+     * @param messageSelector
+     *
+     * @return QueueReceiver - a wrapper around our MessageConsumer
+     *
+     * @throws JMSException
+     */
+    public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException
     {
-        try
-        {
-            // no need for a markClosed* method in this case since there is no protocol traffic closing a producer
-            closeProducers();
-        }
-        catch (JMSException e)
-        {
-            _logger.error("Error closing session: " + e, e);
-        }
-        try
-        {
-            markClosedConsumers();
-        }
-        catch (JMSException e)
-        {
-            _logger.error("Error closing session: " + e, e);
-        }
+        checkValidDestination(destination);
+        AMQQueue dest = (AMQQueue) destination;
+        BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination, messageSelector);
+
+        return new QueueReceiverAdaptor(dest, consumer);
     }
 
     /**
-     * Called to close message producers cleanly. This may or may <b>not</b> be as a result of an error. There is
-     * currently no way of propagating errors to message producers (this is a JMS limitation).
+     * Creates a QueueReceiver wrapping a MessageConsumer
+     *
+     * @param queue
+     *
+     * @return QueueReceiver
+     *
+     * @throws JMSException
      */
-    private void closeProducers() throws JMSException
+    public QueueReceiver createReceiver(Queue queue) throws JMSException
     {
-        // we need to clone the list of producers since the close() method updates the _producers collection
-        // which would result in a concurrent modification exception
-        final ArrayList clonedProducers = new ArrayList(_producers.values());
+        checkNotClosed();
+        AMQQueue dest = (AMQQueue) queue;
+        BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest);
 
-        final Iterator it = clonedProducers.iterator();
-        while (it.hasNext())
-        {
-            final BasicMessageProducer prod = (BasicMessageProducer) it.next();
-            prod.close();
-        }
-        // at this point the _producers map is empty
+        return new QueueReceiverAdaptor(dest, consumer);
     }
 
     /**
-     * Called to close message consumers cleanly. This may or may <b>not</b> be as a result of an error.
+     * Creates a QueueReceiver wrapping a MessageConsumer using a message selector
      *
-     * @param error not null if this is a result of an error occurring at the connection level
+     * @param queue
+     * @param messageSelector
+     *
+     * @return QueueReceiver
+     *
+     * @throws JMSException
      */
-    private void closeConsumers(Throwable error) throws JMSException
+    public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException
     {
-        if (_dispatcher != null)
-        {
-            _dispatcher.close();
-            _dispatcher = null;
-        }
-        // we need to clone the list of consumers since the close() method updates the _consumers collection
-        // which would result in a concurrent modification exception
-        final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList(_consumers.values());
+        checkNotClosed();
+        AMQQueue dest = (AMQQueue) queue;
+        BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector);
 
-        final Iterator<BasicMessageConsumer> it = clonedConsumers.iterator();
-        while (it.hasNext())
-        {
-            final BasicMessageConsumer con = it.next();
-            if (error != null)
-            {
-                con.notifyError(error);
-            }
-            else
-            {
-                con.close();
-            }
-        }
-        // at this point the _consumers map will be empty
+        return new QueueReceiverAdaptor(dest, consumer);
     }
 
-    private void markClosedConsumers() throws JMSException
+    public QueueSender createSender(Queue queue) throws JMSException
     {
-        if (_dispatcher != null)
-        {
-            _dispatcher.close();
-            _dispatcher = null;
-        }
-        // we need to clone the list of consumers since the close() method updates the _consumers collection
-        // which would result in a concurrent modification exception
-        final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList<BasicMessageConsumer>(_consumers.values());
+        checkNotClosed();
 
-        final Iterator<BasicMessageConsumer> it = clonedConsumers.iterator();
-        while (it.hasNext())
+        // return (QueueSender) createProducer(queue);
+        return new QueueSenderAdapter(createProducer(queue), queue);
+    }
+
+    public StreamMessage createStreamMessage() throws JMSException
+    {
+        synchronized (_connection.getFailoverMutex())
         {
-            final BasicMessageConsumer con = it.next();
-            con.markClosed();
+            checkNotClosed();
+
+            return new JMSStreamMessage();
         }
-        // at this point the _consumers map will be empty
     }
 
     /**
-     * Asks the broker to resend all unacknowledged messages for the session.
+     * Creates a non-durable subscriber
+     *
+     * @param topic
+     *
+     * @return TopicSubscriber - a wrapper round our MessageConsumer
      *
      * @throws JMSException
      */
-    public void recover() throws JMSException
+    public TopicSubscriber createSubscriber(Topic topic) throws JMSException
     {
         checkNotClosed();
-        checkNotTransacted(); // throws IllegalStateException if a transacted session
-        // this is set only here, and the before the consumer's onMessage is called it is set to false
-        _inRecovery = true;
-        try
+        AMQTopic dest = checkValidTopic(topic);
+
+        // AMQTopic dest = new AMQTopic(topic.getTopicName());
+        return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
+    }
+
+    /**
+     * Creates a non-durable subscriber with a message selector
+     *
+     * @param topic
+     * @param messageSelector
+     * @param noLocal
+     *
+     * @return TopicSubscriber - a wrapper round our MessageConsumer
+     *
+     * @throws JMSException
+     */
+    public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException
+    {
+        checkNotClosed();
+        AMQTopic dest = checkValidTopic(topic);
+
+        // AMQTopic dest = new AMQTopic(topic.getTopicName());
+        return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal));
+    }
+
+    public TemporaryQueue createTemporaryQueue() throws JMSException
+    {
+        checkNotClosed();
+
+        return new AMQTemporaryQueue(this);
+    }
+
+    public TemporaryTopic createTemporaryTopic() throws JMSException
+    {
+        checkNotClosed();
+
+        return new AMQTemporaryTopic(this);
+    }
+
+    public TextMessage createTextMessage() throws JMSException
+    {
+        synchronized (_connection.getFailoverMutex())
         {
+            checkNotClosed();
 
-            boolean isSuspended = isSuspended();
+            return new JMSTextMessage();
+        }
+    }
 
-            if (!isSuspended)
-            {
-                suspendChannel(true);
-            }
+    public TextMessage createTextMessage(String text) throws JMSException
+    {
 
-            for (BasicMessageConsumer consumer : _consumers.values())
-            {
-                consumer.clearUnackedMessages();
-            }
+        TextMessage msg = createTextMessage();
+        msg.setText(text);
 
-            if (_dispatcher != null)
-            {
-                _dispatcher.rollback();
-            }
+        return msg;
+    }
 
-            if (isStrictAMQP())
+    public Topic createTopic(String topicName) throws JMSException
+    {
+        checkNotClosed();
+
+        if (topicName.indexOf('/') == -1)
+        {
+            return new AMQTopic(getDefaultTopicExchangeName(), new AMQShortString(topicName));
+        }
+        else
+        {
+            try
             {
-                // We can't use the BasicRecoverBody-OK method as it isn't part of the spec.
-                _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
-                                                                                            getProtocolMajorVersion(),
-                                                                                            getProtocolMinorVersion(),
-                                                                                            false));    // requeue
-                _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order.");
+                return new AMQTopic(new AMQBindingURL(topicName));
             }
-            else
+            catch (URLSyntaxException urlse)
             {
+                JMSException jmse = new JMSException(urlse.getReason());
+                jmse.setLinkedException(urlse);
 
-                // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-                // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-                // Be aware of possible changes to parameter order as versions change.
-                _connection.getProtocolHandler().syncWrite(BasicRecoverBody.createAMQFrame(_channelId,
-                                                                                           getProtocolMajorVersion(),
-                                                                                           getProtocolMinorVersion(),
-                                                                                           false)    // requeue
-                        , BasicRecoverOkBody.class);
-            }
-            if (!isSuspended)
-            {
-                suspendChannel(false);
+                throw jmse;
             }
         }
-        catch (AMQException e)
-        {
-            throw new JMSAMQException(e);
-        }
     }
 
-    boolean isInRecovery()
+    public void declareExchange(AMQShortString name, AMQShortString type, boolean nowait) throws AMQException
     {
-        return _inRecovery;
+        declareExchange(name, type, getProtocolHandler(), nowait);
     }
 
-    void setInRecovery(boolean inRecovery)
+    public int getAcknowledgeMode() throws JMSException
     {
-        _inRecovery = inRecovery;
+        checkNotClosed();
+
+        return _acknowledgeMode;
     }
 
-    public void acknowledge() throws JMSException
+    public AMQConnection getAMQConnection()
     {
-        if (isClosed())
-        {
-            throw new IllegalStateException("Session is already closed");
-        }
-        for (BasicMessageConsumer consumer : _consumers.values())
-        {
-            consumer.acknowledge();
-        }
+        return _connection;
+    }
 
+    public int getChannelId()
+    {
+        return _channelId;
+    }
 
+    public int getDefaultPrefetch()
+    {
+        return _defaultPrefetchHighMark;
     }
 
+    public int getDefaultPrefetchHigh()
+    {
+        return _defaultPrefetchHighMark;
+    }
 
-    public MessageListener getMessageListener() throws JMSException
+    public int getDefaultPrefetchLow()
     {
-//        checkNotClosed();
-        return _messageListener;
+        return _defaultPrefetchLowMark;
     }
 
-    public void setMessageListener(MessageListener listener) throws JMSException
+    public AMQShortString getDefaultQueueExchangeName()
     {
-//        checkNotClosed();
-//
-//        if (_dispatcher != null && !_dispatcher.connectionStopped())
-//        {
-//            throw new javax.jms.IllegalStateException("Attempt to set listener while session is started.");
-//        }
-//
-//        // We are stopped
-//        for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
-//        {
-//            BasicMessageConsumer consumer = i.next();
-//
-//            if (consumer.isReceiving())
-//            {
-//                throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously.");
-//            }
-//        }
-//
-//        _messageListener = listener;
-//
-//        for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
-//        {
-//            i.next().setMessageListener(_messageListener);
-//        }
+        return _connection.getDefaultQueueExchangeName();
+    }
 
+    public AMQShortString getDefaultTopicExchangeName()
+    {
+        return _connection.getDefaultTopicExchangeName();
     }
 
-    public void run()
+    public MessageListener getMessageListener() throws JMSException
     {
-        throw new java.lang.UnsupportedOperationException();
+        // checkNotClosed();
+        return _messageListener;
     }
 
-    public BasicMessageProducer createProducer(Destination destination, boolean mandatory,
-                                               boolean immediate, boolean waitUntilSent)
-            throws JMSException
+    public AMQShortString getTemporaryQueueExchangeName()
     {
-        return createProducerImpl(destination, mandatory, immediate, waitUntilSent);
+        return _connection.getTemporaryQueueExchangeName();
     }
 
-    public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate)
-            throws JMSException
+    public AMQShortString getTemporaryTopicExchangeName()
     {
-        return createProducerImpl(destination, mandatory, immediate);
+        return _connection.getTemporaryTopicExchangeName();
     }
 
-    public BasicMessageProducer createProducer(Destination destination, boolean immediate)
-            throws JMSException
+    public int getTicket()
     {
-        return createProducerImpl(destination, DEFAULT_MANDATORY, immediate);
+        return _ticket;
     }
 
-    public BasicMessageProducer createProducer(Destination destination) throws JMSException
+    public boolean getTransacted() throws JMSException
     {
-        return createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE);
+        checkNotClosed();
+
+        return _transacted;
     }
 
-    private BasicMessageProducer createProducerImpl(Destination destination, boolean mandatory,
-                                                    boolean immediate)
-            throws JMSException
+    public boolean hasConsumer(Destination destination)
     {
-        return createProducerImpl(destination, mandatory, immediate, false);
+        AtomicInteger counter = _destinationConsumerCount.get(destination);
+
+        return (counter != null) && (counter.get() != 0);
     }
 
-    private BasicMessageProducer createProducerImpl(final Destination destination, final boolean mandatory,
-                                                    final boolean immediate, final boolean waitUntilSent)
-            throws JMSException
+    public boolean isStrictAMQP()
     {
-        return (BasicMessageProducer) new FailoverSupport()
-        {
-            public Object operation() throws JMSException
-            {
-                checkNotClosed();
-                long producerId = getNextProducerId();
-                BasicMessageProducer producer = new BasicMessageProducer(_connection, (AMQDestination) destination, _transacted, _channelId,
-                                                                         AMQSession.this, getProtocolHandler(),
-                                                                         producerId, immediate, mandatory, waitUntilSent);
-                registerProducer(producerId, producer);
-                return producer;
-            }
-        }.execute(_connection);
+        return _strictAMQP;
+    }
+
+    public boolean isSuspended()
+    {
+        return _suspended;
     }
 
     /**
-     * Creates a QueueReceiver
-     *
-     * @param destination
-     *
-     * @return QueueReceiver - a wrapper around our MessageConsumer
+     * Invoked by the MINA IO thread (indirectly) when a message is received from the transport. Puts the message onto
+     * the queue read by the dispatcher.
      *
-     * @throws JMSException
+     * @param message the message that has been received
      */
-    public QueueReceiver createQueueReceiver(Destination destination) throws JMSException
+    public void messageReceived(UnprocessedMessage message)
     {
-        checkValidDestination(destination);
-        AMQQueue dest = (AMQQueue) destination;
-        BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination);
-        return new QueueReceiverAdaptor(dest, consumer);
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Message["
+                          + ((message.getDeliverBody() == null) ? ("B:" + message.getBounceBody()) : ("D:" + message.getDeliverBody()))
+                          + "] received in session with channel id " + _channelId);
+        }
+
+        if (message.getDeliverBody() == null)
+        {
+            // Return of the bounced message.
+            returnBouncedMessage(message);
+        }
+        else
+        {
+            _queue.add(message);
+        }
     }
 
     /**
-     * Creates a QueueReceiver using a message selector
+     * Stops message delivery in this session, and restarts message delivery with the oldest unacknowledged message.
      *
-     * @param destination
-     * @param messageSelector
+     * <p/>All consumers deliver messages in a serial order. Acknowledging a received message automatically acknowledges
+     * all messages that have been delivered to the client.
      *
-     * @return QueueReceiver - a wrapper around our MessageConsumer
+     * <p/>Restarting a session causes it to take the following actions:
      *
-     * @throws JMSException
+     * <ul> <li>Stop message delivery.</li> <li>Mark all messages that might have been delivered but not acknowledged as
+     * "redelivered". <li>Restart the delivery sequence including all unacknowledged messages that had been previously
+     * delivered. Redelivered messages do not have to be delivered in exactly their original delivery order.</li> </ul>
+     *
+     * <p/>If the recover operation is interrupted by a fail-over, between asking that the broker begin recovery and
+     * receiving acknolwedgement that it hasm then a JMSException will be thrown. In this case it will not be possible
+     * for the client to determine whether the broker is going to recover the session or not.
+     *
+     * @throws JMSException If the JMS provider fails to stop and restart message delivery due to some internal error.
+     *                      Not that this does not necessarily mean that the recovery has failed, but simply that it is
+     *                      not possible to tell if it has or not.
+     * @todo Be aware of possible changes to parameter order as versions change.
      */
-    public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException
+    public void recover() throws JMSException
     {
-        checkValidDestination(destination);
-        AMQQueue dest = (AMQQueue) destination;
-        BasicMessageConsumer consumer = (BasicMessageConsumer)
-                createConsumer(destination, messageSelector);
-        return new QueueReceiverAdaptor(dest, consumer);
-    }
+        // Ensure that the session is open.
+        checkNotClosed();
 
-    public MessageConsumer createConsumer(Destination destination) throws JMSException
-    {
-        checkValidDestination(destination);
-        return createConsumerImpl(destination,
-                                  _defaultPrefetchHighMark,
-                                  _defaultPrefetchLowMark,
-                                  false,
-                                  false,
-                                  null,
-                                  null,
-                                  false,
-                                  false);
-    }
+        // Ensure that the session is not transacted.
+        checkNotTransacted();
 
-    public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
-    {
-        checkValidDestination(destination);
-        return createConsumerImpl(destination,
-                                  _defaultPrefetchHighMark,
-                                  _defaultPrefetchLowMark,
-                                  false,
-                                  false,
-                                  messageSelector,
-                                  null,
-                                  false,
-                                  false);
-    }
+        // this is set only here, and the before the consumer's onMessage is called it is set to false
+        _inRecovery = true;
+        try
+        {
 
-    public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
-            throws JMSException
-    {
-        checkValidDestination(destination);
-        return createConsumerImpl(destination,
-                                  _defaultPrefetchHighMark,
-                                  _defaultPrefetchLowMark,
-                                  noLocal,
-                                  false,
-                                  messageSelector,
-                                  null,
-                                  false,
-                                  false);
-    }
+            boolean isSuspended = isSuspended();
 
-    public MessageConsumer createBrowserConsumer(Destination destination,
-                                                 String messageSelector,
-                                                 boolean noLocal)
-            throws JMSException
-    {
-        checkValidDestination(destination);
-        return createConsumerImpl(destination,
-                                  _defaultPrefetchHighMark,
-                                  _defaultPrefetchLowMark,
-                                  noLocal,
-                                  false,
-                                  messageSelector,
-                                  null,
-                                  true,
-                                  true);
+            if (!isSuspended)
+            {
+                suspendChannel(true);
+            }
+
+            for (BasicMessageConsumer consumer : _consumers.values())
+            {
+                consumer.clearUnackedMessages();
+            }
+
+            if (_dispatcher != null)
+            {
+                _dispatcher.rollback();
+            }
+
+            if (isStrictAMQP())
+            {
+                // We can't use the BasicRecoverBody-OK method as it isn't part of the spec.
+                _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
+                                                                                            getProtocolMajorVersion(), getProtocolMinorVersion(), false)); // requeue
+                _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order.");
+            }
+            else
+            {
+
+                _connection.getProtocolHandler().syncWrite(
+                        BasicRecoverBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), false) // requeue
+                        , BasicRecoverOkBody.class);
+            }
+
+            if (!isSuspended)
+            {
+                suspendChannel(false);
+            }
+        }
+        catch (AMQException e)
+        {
+            throw new JMSAMQException("Recover failed: " + e.getMessage(), e);
+        }
+        catch (FailoverException e)
+        {
+            throw new JMSAMQException("Recovery was interrupted by fail-over. Recovery status is not known.", e);
+        }
     }
 
-    public MessageConsumer createConsumer(Destination destination,
-                                          int prefetch,
-                                          boolean noLocal,
-                                          boolean exclusive,
-                                          String selector) throws JMSException
+    public void rejectMessage(UnprocessedMessage message, boolean requeue)
     {
-        checkValidDestination(destination);
-        return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null, false, false);
-    }
 
+        if (_logger.isTraceEnabled())
+        {
+            _logger.trace("Rejecting Unacked message:" + message.getDeliverBody().deliveryTag);
+        }
 
-    public MessageConsumer createConsumer(Destination destination,
-                                          int prefetchHigh,
-                                          int prefetchLow,
-                                          boolean noLocal,

[... 2269 lines stripped ...]