You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2012/03/10 20:22:37 UTC

svn commit: r1299257 [15/26] - in /qpid/branches/rg-amqp-1-0-sandbox/qpid/java: broker-plugins/ broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/ broker-plugins/access-control/src/main/java/org/apache/qpid/serve...

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Sat Mar 10 19:22:10 2012
@@ -20,50 +20,8 @@
  */
 package org.apache.qpid.client;
 
-import java.io.Serializable;
-import java.net.URISyntaxException;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import javax.jms.BytesMessage;
-import javax.jms.Destination;
-import javax.jms.IllegalStateException;
-import javax.jms.InvalidDestinationException;
-import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Queue;
-import javax.jms.QueueBrowser;
-import javax.jms.QueueReceiver;
-import javax.jms.QueueSender;
-import javax.jms.QueueSession;
-import javax.jms.StreamMessage;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.TopicPublisher;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
-import javax.jms.TransactionRolledBackException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.AMQChannelClosedException;
 import org.apache.qpid.AMQDisconnectedException;
@@ -89,7 +47,7 @@ import org.apache.qpid.client.message.Un
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.util.FlowControllingBlockingQueue;
 import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.filter.MessageFilter;
+import org.apache.qpid.configuration.ClientProperties;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.MethodRegistry;
@@ -98,8 +56,27 @@ import org.apache.qpid.protocol.AMQConst
 import org.apache.qpid.thread.Threading;
 import org.apache.qpid.transport.SessionException;
 import org.apache.qpid.transport.TransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import javax.jms.IllegalStateException;
+import java.io.Serializable;
+import java.net.URISyntaxException;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * <p/><table id="crc"><caption>CRC Card</caption>
@@ -119,150 +96,65 @@ import org.slf4j.LoggerFactory;
  */
 public abstract class AMQSession<C extends BasicMessageConsumer, P extends BasicMessageProducer> extends Closeable implements Session, QueueSession, TopicSession
 {
-    public static final class IdToConsumerMap<C extends BasicMessageConsumer>
-    {
-        private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
-        private final ConcurrentHashMap<Integer, C> _slowAccessConsumers = new ConcurrentHashMap<Integer, C>();
-
-        public C get(int id)
-        {
-            if ((id & 0xFFFFFFF0) == 0)
-            {
-                return (C) _fastAccessConsumers[id];
-            }
-            else
-            {
-                return _slowAccessConsumers.get(id);
-            }
-        }
-
-        public C put(int id, C consumer)
-        {
-            C oldVal;
-            if ((id & 0xFFFFFFF0) == 0)
-            {
-                oldVal = (C) _fastAccessConsumers[id];
-                _fastAccessConsumers[id] = consumer;
-            }
-            else
-            {
-                oldVal = _slowAccessConsumers.put(id, consumer);
-            }
-
-            return oldVal;
-
-        }
-
-        public C remove(int id)
-        {
-            C consumer;
-            if ((id & 0xFFFFFFF0) == 0)
-            {
-                consumer = (C) _fastAccessConsumers[id];
-                _fastAccessConsumers[id] = null;
-            }
-            else
-            {
-                consumer = _slowAccessConsumers.remove(id);
-            }
-
-            return consumer;
-
-        }
+    /** Used for debugging. */
+    private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
 
-        public Collection<C> values()
-        {
-            ArrayList<C> values = new ArrayList<C>();
+    /** System property to enable strict AMQP compliance. */
+    public static final String STRICT_AMQP = "STRICT_AMQP";
 
-            for (int i = 0; i < 16; i++)
-            {
-                if (_fastAccessConsumers[i] != null)
-                {
-                    values.add((C) _fastAccessConsumers[i]);
-                }
-            }
-            values.addAll(_slowAccessConsumers.values());
+    /** Strict AMQP default setting. */
+    public static final String STRICT_AMQP_DEFAULT = "false";
 
-            return values;
-        }
+    /** System property to enable failure if strict AMQP compliance is violated. */
+    public static final String STRICT_AMQP_FATAL = "STRICT_AMQP_FATAL";
 
-        public void clear()
-        {
-            _slowAccessConsumers.clear();
-            for (int i = 0; i < 16; i++)
-            {
-                _fastAccessConsumers[i] = null;
-            }
-        }
-    }
+    /** Strickt AMQP failure default. */
+    public static final String STRICT_AMQP_FATAL_DEFAULT = "true";
 
-    final AMQSession<C, P> _thisSession = this;
-    
-    /** Used for debugging. */
-    private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
+    /** System property to enable immediate message prefetching. */
+    public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH";
 
-    /**
-     * The default value for immediate flag used by producers created by this session is false. That is, a consumer does
-     * not need to be attached to a queue.
-     */
-    protected final boolean DEFAULT_IMMEDIATE = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false"));
+    /** Immediate message prefetch default. */
+    public static final String IMMEDIATE_PREFETCH_DEFAULT = "false";
 
-    /**
-     * The default value for mandatory flag used by producers created by this session is true. That is, server will not
-     * silently drop messages where no queue is connected to the exchange for the message.
-     */
-    protected final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true"));
+    public static final long DEFAULT_FLOW_CONTROL_WAIT_FAILURE = 120000L;
 
     /**
      * The period to wait while flow controlled before sending a log message confirming that the session is still
      * waiting on flow control being revoked
      */
-    protected final long FLOW_CONTROL_WAIT_PERIOD = Long.getLong("qpid.flow_control_wait_notify_period",5000L);
+    private final long _flowControlWaitPeriod = Long.getLong("qpid.flow_control_wait_notify_period",5000L);
 
     /**
      * The period to wait while flow controlled before declaring a failure
      */
-    public static final long DEFAULT_FLOW_CONTROL_WAIT_FAILURE = 120000L;
-    protected final long FLOW_CONTROL_WAIT_FAILURE = Long.getLong("qpid.flow_control_wait_failure",
+    private final long _flowControlWaitFailure = Long.getLong("qpid.flow_control_wait_failure",
                                                                   DEFAULT_FLOW_CONTROL_WAIT_FAILURE);
 
-    protected final boolean DECLARE_QUEUES =
+    private final boolean _delareQueues =
         Boolean.parseBoolean(System.getProperty("qpid.declare_queues", "true"));
 
-    protected final boolean DECLARE_EXCHANGES =
+    private final boolean _declareExchanges =
         Boolean.parseBoolean(System.getProperty("qpid.declare_exchanges", "true"));
-    
-    protected final boolean USE_AMQP_ENCODED_MAP_MESSAGE;
-
-    /** System property to enable strict AMQP compliance. */
-    public static final String STRICT_AMQP = "STRICT_AMQP";
-
-    /** Strict AMQP default setting. */
-    public static final String STRICT_AMQP_DEFAULT = "false";
 
-    /** System property to enable failure if strict AMQP compliance is violated. */
-    public static final String STRICT_AMQP_FATAL = "STRICT_AMQP_FATAL";
-
-    /** Strickt AMQP failure default. */
-    public static final String STRICT_AMQP_FATAL_DEFAULT = "true";
-
-    /** System property to enable immediate message prefetching. */
-    public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH";
+    private final boolean _useAMQPEncodedMapMessage;
 
-    /** Immediate message prefetch default. */
-    public static final String IMMEDIATE_PREFETCH_DEFAULT = "false";
+    /**
+     * Flag indicating to start dispatcher as a daemon thread
+     */
+    protected final boolean DEAMON_DISPATCHER_THREAD = Boolean.getBoolean(ClientProperties.DAEMON_DISPATCHER);
 
     /** The connection to which this session belongs. */
-    protected AMQConnection _connection;
+    private AMQConnection _connection;
 
     /** Used to indicate whether or not this is a transactional session. */
-    protected final boolean _transacted;
+    private final boolean _transacted;
 
     /** Holds the sessions acknowledgement mode. */
-    protected final int _acknowledgeMode;
+    private final int _acknowledgeMode;
 
     /** Holds this session unique identifier, used to distinguish it from other sessions. */
-    protected int _channelId;
+    private int _channelId;
 
     private int _ticket;
 
@@ -278,55 +170,30 @@ public abstract class AMQSession<C exten
     /** Used to indicate that this session has been started at least once. */
     private AtomicBoolean _startedAtLeastOnce = new AtomicBoolean(false);
 
-    /**
-     * Used to reference durable subscribers so that requests for unsubscribe can be handled correctly.  Note this only
-     * keeps a record of subscriptions which have been created in the current instance. It does not remember
-     * subscriptions between executions of the client.
-     */
-    protected final ConcurrentHashMap<String, TopicSubscriberAdaptor<C>> _subscriptions =
+    private final ConcurrentHashMap<String, TopicSubscriberAdaptor<C>> _subscriptions =
             new ConcurrentHashMap<String, TopicSubscriberAdaptor<C>>();
 
-    /**
-     * Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked
-     * up in the {@link #_subscriptions} map.
-     */
-    protected final ConcurrentHashMap<C, String> _reverseSubscriptionMap = new ConcurrentHashMap<C, String>();
+    private final ConcurrentHashMap<C, String> _reverseSubscriptionMap = new ConcurrentHashMap<C, String>();
 
-    /**
-     * Locks to keep access to subscriber details atomic.
-     * <p>
-     * Added for QPID2418
-     */
-    protected final Lock _subscriberDetails = new ReentrantLock(true);
-    protected final Lock _subscriberAccess = new ReentrantLock(true);
+    private final Lock _subscriberDetails = new ReentrantLock(true);
+    private final Lock _subscriberAccess = new ReentrantLock(true);
 
-    /**
-     * Used to hold incoming messages.
-     *
-     * @todo Weaken the type once {@link FlowControllingBlockingQueue} implements Queue.
-     */
-    protected final FlowControllingBlockingQueue _queue;
+    private final FlowControllingBlockingQueue _queue;
 
-    /** Holds the highest received delivery tag. */
-    protected final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
+    private final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
     private final AtomicLong _rollbackMark = new AtomicLong(-1);
 
-    /** Pre-fetched message tags */
-    protected ConcurrentLinkedQueue<Long> _prefetchedMessageTags = new ConcurrentLinkedQueue<Long>();
+    private ConcurrentLinkedQueue<Long> _prefetchedMessageTags = new ConcurrentLinkedQueue<Long>();
 
-    /** All the not yet acknowledged message tags */
-    protected ConcurrentLinkedQueue<Long> _unacknowledgedMessageTags = new ConcurrentLinkedQueue<Long>();
+    private ConcurrentLinkedQueue<Long> _unacknowledgedMessageTags = new ConcurrentLinkedQueue<Long>();
 
-    /** All the delivered message tags */
-    protected ConcurrentLinkedQueue<Long> _deliveredMessageTags = new ConcurrentLinkedQueue<Long>();
+    private ConcurrentLinkedQueue<Long> _deliveredMessageTags = new ConcurrentLinkedQueue<Long>();
 
-    /** Holds the dispatcher thread for this session. */
-    protected Dispatcher _dispatcher;
+    private volatile Dispatcher _dispatcher;
 
-    protected Thread _dispatcherThread;
+    private volatile Thread _dispatcherThread;
 
-    /** Holds the message factory factory for this session. */
-    protected MessageFactoryRegistry _messageFactoryRegistry;
+    private MessageFactoryRegistry _messageFactoryRegistry;
 
     /** Holds all of the producers created by this session, keyed by their unique identifiers. */
     private Map<Long, MessageProducer> _producers = new ConcurrentHashMap<Long, MessageProducer>();
@@ -337,11 +204,7 @@ public abstract class AMQSession<C exten
      */
     private int _nextTag = 1;
 
-    /**
-     * Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right
-     * consumer.
-     */
-    protected final IdToConsumerMap<C> _consumers = new IdToConsumerMap<C>();
+    private final IdToConsumerMap<C> _consumers = new IdToConsumerMap<C>();
 
     /**
      * Contains a list of consumers which have been removed but which might still have
@@ -367,10 +230,6 @@ public abstract class AMQSession<C exten
      */
     private volatile boolean _sessionInRecovery;
 
-    /**
-     * Set when the dispatcher should direct incoming messages straight into the UnackedMessage list instead of
-     * to the syncRecieveQueue or MessageListener. Used during cleanup, e.g. in Session.recover().
-     */
     private volatile boolean _usingDispatcherForCleanup;
 
     /** Used to indicates that the connection to which this session belongs, has been stopped. */
@@ -388,28 +247,163 @@ public abstract class AMQSession<C exten
      */
     private final Object _suspensionLock = new Object();
 
-    /**
-     * Used to ensure that only the first call to start the dispatcher can unsuspend the channel.
-     *
-     * @todo This is accessed only within a synchronized method, so does not need to be atomic.
-     */
-    protected final AtomicBoolean _firstDispatcher = new AtomicBoolean(true);
+    private final AtomicBoolean _firstDispatcher = new AtomicBoolean(true);
 
-    /** Used to indicate that the session should start pre-fetching messages as soon as it is started. */
-    protected final boolean _immediatePrefetch;
+    private final boolean _immediatePrefetch;
 
-    /** Indicates that warnings should be generated on violations of the strict AMQP. */
-    protected final boolean _strictAMQP;
+    private final boolean _strictAMQP;
 
-    /** Indicates that runtime exceptions should be generated on vilations of the strict AMQP. */
-    protected final boolean _strictAMQPFATAL;
+    private final boolean _strictAMQPFATAL;
     private final Object _messageDeliveryLock = new Object();
 
     /** Session state : used to detect if commit is a) required b) allowed , i.e. does the tx span failover. */
     private boolean _dirty;
     /** Has failover occured on this session with outstanding actions to commit? */
     private boolean _failedOverDirty;
-    
+
+    /** Flow control */
+    private FlowControlIndicator _flowControl = new FlowControlIndicator();
+
+
+
+    /** Holds the highest received delivery tag. */
+    protected AtomicLong getHighestDeliveryTag()
+    {
+        return _highestDeliveryTag;
+    }
+
+    /** Pre-fetched message tags */
+    protected ConcurrentLinkedQueue<Long> getPrefetchedMessageTags()
+    {
+        return _prefetchedMessageTags;
+    }
+
+    /** All the not yet acknowledged message tags */
+    protected ConcurrentLinkedQueue<Long> getUnacknowledgedMessageTags()
+    {
+        return _unacknowledgedMessageTags;
+    }
+
+    /** All the delivered message tags */
+    protected ConcurrentLinkedQueue<Long> getDeliveredMessageTags()
+    {
+        return _deliveredMessageTags;
+    }
+
+    /** Holds the dispatcher thread for this session. */
+    protected Dispatcher getDispatcher()
+    {
+        return _dispatcher;
+    }
+
+    protected Thread getDispatcherThread()
+    {
+        return _dispatcherThread;
+    }
+
+    /** Holds the message factory factory for this session. */
+    protected MessageFactoryRegistry getMessageFactoryRegistry()
+    {
+        return _messageFactoryRegistry;
+    }
+
+    /**
+     * Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right
+     * consumer.
+     */
+    protected IdToConsumerMap<C> getConsumers()
+    {
+        return _consumers;
+    }
+
+    protected void setUsingDispatcherForCleanup(boolean usingDispatcherForCleanup)
+    {
+        _usingDispatcherForCleanup = usingDispatcherForCleanup;
+    }
+
+    /** Used to indicate that the session should start pre-fetching messages as soon as it is started. */
+    protected boolean isImmediatePrefetch()
+    {
+        return _immediatePrefetch;
+    }
+
+    public static final class IdToConsumerMap<C extends BasicMessageConsumer>
+    {
+        private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
+        private final ConcurrentHashMap<Integer, C> _slowAccessConsumers = new ConcurrentHashMap<Integer, C>();
+
+        public C get(int id)
+        {
+            if ((id & 0xFFFFFFF0) == 0)
+            {
+                return (C) _fastAccessConsumers[id];
+            }
+            else
+            {
+                return _slowAccessConsumers.get(id);
+            }
+        }
+
+        public C put(int id, C consumer)
+        {
+            C oldVal;
+            if ((id & 0xFFFFFFF0) == 0)
+            {
+                oldVal = (C) _fastAccessConsumers[id];
+                _fastAccessConsumers[id] = consumer;
+            }
+            else
+            {
+                oldVal = _slowAccessConsumers.put(id, consumer);
+            }
+
+            return oldVal;
+
+        }
+
+        public C remove(int id)
+        {
+            C consumer;
+            if ((id & 0xFFFFFFF0) == 0)
+            {
+                consumer = (C) _fastAccessConsumers[id];
+                _fastAccessConsumers[id] = null;
+            }
+            else
+            {
+                consumer = _slowAccessConsumers.remove(id);
+            }
+
+            return consumer;
+
+        }
+
+        public Collection<C> values()
+        {
+            ArrayList<C> values = new ArrayList<C>();
+
+            for (int i = 0; i < 16; i++)
+            {
+                if (_fastAccessConsumers[i] != null)
+                {
+                    values.add((C) _fastAccessConsumers[i]);
+                }
+            }
+            values.addAll(_slowAccessConsumers.values());
+
+            return values;
+        }
+
+        public void clear()
+        {
+            _slowAccessConsumers.clear();
+            for (int i = 0; i < 16; i++)
+            {
+                _fastAccessConsumers[i] = null;
+            }
+        }
+    }
+
     private static final class FlowControlIndicator
     {
         private volatile boolean _flowControl = true;
@@ -426,9 +420,6 @@ public abstract class AMQSession<C exten
         }
     }
 
-    /** Flow control */
-    private FlowControlIndicator _flowControl = new FlowControlIndicator();
-
     /**
      * Creates a new session on a connection.
      *
@@ -443,7 +434,7 @@ public abstract class AMQSession<C exten
     protected AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
                MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
     {
-        USE_AMQP_ENCODED_MAP_MESSAGE = con == null ? true : !con.isUseLegacyMapMessageFormat();
+        _useAMQPEncodedMapMessage = con == null ? true : !con.isUseLegacyMapMessageFormat();
         _strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT));
         _strictAMQPFATAL =
                 Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT));
@@ -479,7 +470,7 @@ public abstract class AMQSession<C exten
                                                          {
                                                              // If the session has been closed don't waste time creating a thread to do
                                                              // flow control
-                                                             if (!(_thisSession.isClosed() || _thisSession.isClosing()))
+                                                             if (!(AMQSession.this.isClosed() || AMQSession.this.isClosing()))
                                                              {   
                                                                  // Only execute change if previous state
                                                                  // was False
@@ -507,7 +498,7 @@ public abstract class AMQSession<C exten
                                                          {
                                                              // If the session has been closed don't waste time creating a thread to do
                                                              // flow control
-                                                             if (!(_thisSession.isClosed() || _thisSession.isClosing()))
+                                                             if (!(AMQSession.this.isClosed() || AMQSession.this.isClosing()))
                                                              {
                                                                  // Only execute change if previous state
                                                                  // was true
@@ -539,9 +530,9 @@ public abstract class AMQSession<C exten
         }
 
         // Add creation logging to tie in with the existing close logging
-        if (_logger.isInfoEnabled())
+        if (_logger.isDebugEnabled())
         {
-            _logger.info("Created session:" + this);
+            _logger.debug("Created session:" + this);
         }
     }
 
@@ -730,17 +721,15 @@ public abstract class AMQSession<C exten
 
     private void close(long timeout, boolean sendClose) throws JMSException
     {
-        if (_logger.isInfoEnabled())
+        if (_logger.isDebugEnabled())
         {
-            // StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
-            _logger.info("Closing session: " + this); // + ":"
-            // Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
+            _logger.debug("Closing session: " + this);
         }
 
         // Ensure we only try and close an open session.
-        if (!_closed.getAndSet(true))
+        if (!setClosed())
         {
-            _closing.set(true);
+            setClosing(true);
             synchronized (getFailoverMutex())
             {
                 // We must close down all producers and consumers in an orderly fashion. This is the only method
@@ -808,7 +797,7 @@ public abstract class AMQSession<C exten
 
         if (e instanceof AMQDisconnectedException)
         {
-            if (_dispatcher != null)
+            if (_dispatcherThread != null)
             {
                 // Failover failed and ain't coming back. Knife the dispatcher.
                 _dispatcherThread.interrupt();
@@ -817,9 +806,9 @@ public abstract class AMQSession<C exten
        }
 
         //if we don't have an exception then we can perform closing operations
-        _closing.set(e == null);
+        setClosing(e == null);
 
-        if (!_closed.getAndSet(true))
+        if (!setClosed())
         {
             synchronized (_messageDeliveryLock)
             {
@@ -903,11 +892,11 @@ public abstract class AMQSession<C exten
                 // Flush any pending messages for this consumerTag
                 if (_dispatcher != null)
                 {
-                    _logger.info("Dispatcher is not null");
+                    _logger.debug("Dispatcher is not null");
                 }
                 else
                 {
-                    _logger.info("Dispatcher is null so created stopped dispatcher");
+                    _logger.debug("Dispatcher is null so created stopped dispatcher");
                     startDispatcherIfNecessary(true);
                 }
 
@@ -918,18 +907,16 @@ public abstract class AMQSession<C exten
                 // Just close the consumer
                 // fixme  the CancelOK is being processed before the arriving messages..
                 // The dispatcher is still to process them so the server sent in order but the client
-                // has yet to receive before the close comes in.
-
-                // consumer.markClosed();
+                // has yet to receive before the close comes in
 
                 if (consumer.isAutoClose())
                 {
                     // There is a small window where the message is between the two queues in the dispatcher.
                     if (consumer.isClosed())
                     {
-                        if (_logger.isInfoEnabled())
+                        if (_logger.isDebugEnabled())
                         {
-                            _logger.info("Closing consumer:" + consumer.debugIdentity());
+                            _logger.debug("Closing consumer:" + consumer.debugIdentity());
                         }
 
                         deregisterConsumer(consumer);
@@ -953,6 +940,9 @@ public abstract class AMQSession<C exten
         return createBrowser(queue, null);
     }
 
+    /**
+     * Create a queue browser if the destination is a valid queue.
+     */
     public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException
     {
         if (isStrictAMQP())
@@ -963,7 +953,7 @@ public abstract class AMQSession<C exten
         checkNotClosed();
         checkValidQueue(queue);
 
-        return new AMQQueueBrowser(this, (AMQQueue) queue, messageSelector);
+        return new AMQQueueBrowser(this, queue, messageSelector);
     }
 
     protected MessageConsumer createBrowserConsumer(Destination destination, String messageSelector, boolean noLocal)
@@ -1043,7 +1033,7 @@ public abstract class AMQSession<C exten
         {
             try
             {
-                handleAddressBasedDestination(dest,false,true);
+                handleAddressBasedDestination(dest,false,noLocal,true);
                 if (dest.getAddressType() !=  AMQDestination.TOPIC_TYPE)
                 {
                     throw new JMSException("Durable subscribers can only be created for Topics");
@@ -1099,6 +1089,10 @@ public abstract class AMQSession<C exten
                     // possible to determine  when querying the broker whether there are no arguments or just a non-matching selector
                     // argument, as specifying null for the arguments when querying means they should not be checked at all
                     args.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector);
+                    if(noLocal)
+                    {
+                        args.put(AMQPFilterTypes.NO_LOCAL.getValue().toString(), true);
+                    }
 
                     // if the queue is bound to the exchange but NOT for this topic and selector, then the JMS spec
                     // says we must trash the subscription.
@@ -1159,7 +1153,7 @@ public abstract class AMQSession<C exten
     public MapMessage createMapMessage() throws JMSException
     {
         checkNotClosed();
-        if (USE_AMQP_ENCODED_MAP_MESSAGE)
+        if (_useAMQPEncodedMapMessage)
         {
             AMQPEncodedMapMessage msg = new AMQPEncodedMapMessage(getMessageDelegateFactory());
             msg.setAMQSession(this);
@@ -1196,12 +1190,12 @@ public abstract class AMQSession<C exten
 
     public P createProducer(Destination destination) throws JMSException
     {
-        return createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE);
+        return createProducerImpl(destination, null, null);
     }
 
     public P createProducer(Destination destination, boolean immediate) throws JMSException
     {
-        return createProducerImpl(destination, DEFAULT_MANDATORY, immediate);
+        return createProducerImpl(destination, null, immediate);
     }
 
     public P createProducer(Destination destination, boolean mandatory, boolean immediate)
@@ -1600,7 +1594,6 @@ public abstract class AMQSession<C exten
 
     public MessageListener getMessageListener() throws JMSException
     {
-        // checkNotClosed();
         return _messageListener;
     }
 
@@ -1648,6 +1641,7 @@ public abstract class AMQSession<C exten
         return (counter != null) && (counter.get() != 0);
     }
 
+    /** Indicates that warnings should be generated on violations of the strict AMQP. */
     public boolean isStrictAMQP()
     {
         return _strictAMQP;
@@ -1690,7 +1684,7 @@ public abstract class AMQSession<C exten
     {
         AMQProtocolHandler protocolHandler = getProtocolHandler();
         declareExchange(amqd, protocolHandler, false);
-        AMQShortString queueName = declareQueue(amqd, protocolHandler, false);
+        AMQShortString queueName = declareQueue(amqd, false);
         bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(), amqd);
     }
 
@@ -1886,31 +1880,6 @@ public abstract class AMQSession<C exten
 
     public void setMessageListener(MessageListener listener) throws JMSException
     {
-        // checkNotClosed();
-        //
-        // if (_dispatcher != null && !_dispatcher.connectionStopped())
-        // {
-        // throw new javax.njms.IllegalStateException("Attempt to set listener while session is started.");
-        // }
-        //
-        // // We are stopped
-        // for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
-        // {
-        // BasicMessageConsumer consumer = i.next();
-        //
-        // if (consumer.isReceiving())
-        // {
-        // throw new javax.njms.IllegalStateException("Another thread is already receiving synchronously.");
-        // }
-        // }
-        //
-        // _messageListener = listener;
-        //
-        // for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
-        // {
-        // i.next().setMessageListener(_messageListener);
-        // }
-
     }
     
     /**
@@ -2184,7 +2153,7 @@ public abstract class AMQSession<C exten
      */
     void markClosed()
     {
-        _closed.set(true);
+        setClosed();
         _connection.deregisterSession(_channelId);
         markClosedProducersAndConsumers();
 
@@ -2199,7 +2168,7 @@ public abstract class AMQSession<C exten
     {
         if (Thread.currentThread() == _dispatcherThread)
         {
-            while (!_closed.get() && !_queue.isEmpty())
+            while (!super.isClosed() && !_queue.isEmpty())
             {
                 Dispatchable disp;
                 try
@@ -2247,6 +2216,58 @@ public abstract class AMQSession<C exten
         }
     }
 
+    void drainDispatchQueue()
+    {
+        if (Thread.currentThread() == _dispatcherThread)
+        {
+            while (!super.isClosed() && !_queue.isEmpty())
+            {
+                Dispatchable disp;
+                try
+                {
+                    disp = (Dispatchable) _queue.take();
+                }
+                catch (InterruptedException e)
+                {
+                    throw new RuntimeException(e);
+                }
+
+                // Check just in case _queue becomes empty, it shouldn't but
+                // better than an NPE.
+                if (disp == null)
+                {
+                    _logger.debug("_queue became empty during sync.");
+                    break;
+                }
+
+                disp.dispatch(AMQSession.this);
+            }
+        }
+        else
+        {
+            startDispatcherIfNecessary(false);
+
+            final CountDownLatch signal = new CountDownLatch(1);
+
+            _queue.add(new Dispatchable()
+            {
+                public void dispatch(AMQSession ssn)
+                {
+                    signal.countDown();
+                }
+            });
+
+            try
+            {
+                signal.await();
+            }
+            catch (InterruptedException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
     /**
      * Resubscribes all producers and consumers. This is called when performing failover.
      *
@@ -2289,7 +2310,7 @@ public abstract class AMQSession<C exten
      */
     void start() throws AMQException
     {
-        // Check if the session has perviously been started and suspended, in which case it must be unsuspended.
+        // Check if the session has previously been started and suspended, in which case it must be unsuspended.
         if (_startedAtLeastOnce.getAndSet(true))
         {
             suspendChannel(false);
@@ -2323,7 +2344,7 @@ public abstract class AMQSession<C exten
                 }
                 catch (AMQException e)
                 {
-                    _logger.info("Unsuspending channel threw an exception:" + e);
+                    _logger.info("Unsuspending channel threw an exception:", e);
                 }
             }
         }
@@ -2346,12 +2367,12 @@ public abstract class AMQSession<C exten
                 throw new Error("Error creating Dispatcher thread",e);
             }
             _dispatcherThread.setName("Dispatcher-Channel-" + _channelId);
-            _dispatcherThread.setDaemon(true);
+            _dispatcherThread.setDaemon(DEAMON_DISPATCHER_THREAD);
             _dispatcher.setConnectionStopped(initiallyStopped);
             _dispatcherThread.start();
-            if (_dispatcherLogger.isInfoEnabled())
+            if (_dispatcherLogger.isDebugEnabled())
             {
-                _dispatcherLogger.info(_dispatcherThread.getName() + " created");
+                _dispatcherLogger.debug(_dispatcherThread.getName() + " created");
             }
         }
         else
@@ -2371,32 +2392,6 @@ public abstract class AMQSession<C exten
         }
     }
 
-    /*
-     * Binds the named queue, with the specified routing key, to the named exchange.
-     *
-     * <p/>Note that this operation automatically retries in the event of fail-over.
-     *
-     * @param queueName    The name of the queue to bind.
-     * @param routingKey   The routing key to bind the queue with.
-     * @param arguments    Additional arguments.
-     * @param exchangeName The exchange to bind the queue on.
-     *
-     * @throws AMQException If the queue cannot be bound for any reason.
-     */
-    /*private void bindQueue(AMQDestination amqd, AMQShortString queueName, AMQProtocolHandler protocolHandler, FieldTable ft)
-        throws AMQException, FailoverException
-    {
-        AMQFrame queueBind =
-            QueueBindBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), ft, // arguments
-                amqd.getExchangeName(), // exchange
-                false, // nowait
-                queueName, // queue
-                amqd.getRoutingKey(), // routingKey
-                getTicket()); // ticket
-
-        protocolHandler.syncWrite(queueBind, QueueBindOkBody.class);
-    }*/
-
     private void checkNotTransacted() throws JMSException
     {
         if (getTransacted())
@@ -2580,7 +2575,7 @@ public abstract class AMQSession<C exten
      * @param queueName
      */
     private void consumeFromQueue(C consumer, AMQShortString queueName,
-                                  AMQProtocolHandler protocolHandler, boolean nowait, MessageFilter messageSelector) throws AMQException, FailoverException
+                                  AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException, FailoverException
     {
         int tagId = _nextTag++;
 
@@ -2597,7 +2592,7 @@ public abstract class AMQSession<C exten
 
         try
         {
-            sendConsume(consumer, queueName, protocolHandler, nowait, messageSelector, tagId);
+            sendConsume(consumer, queueName, protocolHandler, nowait, tagId);
         }
         catch (AMQException e)
         {
@@ -2608,9 +2603,9 @@ public abstract class AMQSession<C exten
     }
 
     public abstract void sendConsume(C consumer, AMQShortString queueName,
-                                     AMQProtocolHandler protocolHandler, boolean nowait, MessageFilter messageSelector, int tag) throws AMQException, FailoverException;
+                                     AMQProtocolHandler protocolHandler, boolean nowait, int tag) throws AMQException, FailoverException;
 
-    private P createProducerImpl(final Destination destination, final boolean mandatory, final boolean immediate)
+    private P createProducerImpl(final Destination destination, final Boolean mandatory, final Boolean immediate)
             throws JMSException
     {
         return new FailoverRetrySupport<P, JMSException>(
@@ -2639,8 +2634,8 @@ public abstract class AMQSession<C exten
                 }, _connection).execute();
     }
 
-    public abstract P createMessageProducer(final Destination destination, final boolean mandatory,
-                                                               final boolean immediate, final long producerId) throws JMSException;
+    public abstract P createMessageProducer(final Destination destination, final Boolean mandatory,
+                                            final Boolean immediate, final long producerId) throws JMSException;
 
     private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException
     {
@@ -2661,18 +2656,38 @@ public abstract class AMQSession<C exten
     public long getQueueDepth(final AMQDestination amqd)
             throws AMQException
     {
-        return new FailoverNoopSupport<Long, AMQException>(
-                new FailoverProtectedOperation<Long, AMQException>()
-                {
-                    public Long execute() throws AMQException, FailoverException
-                    {
-                        return requestQueueDepth(amqd);
-                    }
-                }, _connection).execute();
+        return getQueueDepth(amqd, false);
+    }
 
+    /**
+     * Returns the number of messages currently queued by the given
+     * destination. Syncs session before receiving the queue depth if sync is
+     * set to true.
+     *
+     * @param amqd AMQ destination to get the depth value
+     * @param sync flag to sync session before receiving the queue depth
+     * @return queue depth
+     * @throws AMQException
+     */
+    public long getQueueDepth(final AMQDestination amqd, final boolean sync) throws AMQException
+    {
+        return new FailoverNoopSupport<Long, AMQException>(new FailoverProtectedOperation<Long, AMQException>()
+        {
+            public Long execute() throws AMQException, FailoverException
+            {
+                try
+                {
+                    return requestQueueDepth(amqd, sync);
+                }
+                catch (TransportException e)
+                {
+                    throw new AMQException(AMQConstant.getConstant(getErrorCode(e)), e.getMessage(), e);
+                }
+            }
+        }, _connection).execute();
     }
 
-    protected abstract Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException;
+    protected abstract Long requestQueueDepth(AMQDestination amqd, boolean sync) throws AMQException, FailoverException;
 
     /**
      * Declares the named exchange and type of exchange.
@@ -2703,6 +2718,12 @@ public abstract class AMQSession<C exten
     public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler,
                                              final boolean nowait) throws AMQException, FailoverException;
 
+
+    void declareQueuePassive(AMQDestination queue) throws AMQException
+    {
+        declareQueue(queue,false,false,true);
+    }
+
     /**
      * Declares a queue for a JMS destination.
      *
@@ -2712,27 +2733,35 @@ public abstract class AMQSession<C exten
      *
      * <p/>Note that this operation automatically retries in the event of fail-over.
      *
-     * @param amqd            The destination to declare as a queue.
-     * @param protocolHandler The protocol handler to communicate through.
      *
+     * @param amqd            The destination to declare as a queue.
      * @return The name of the decalred queue. This is useful where the broker is generating a queue name on behalf of
      *         the client.
      *
+     *
+     *
      * @throws AMQException If the queue cannot be declared for any reason.
      * @todo Verify the destiation is valid or throw an exception.
      * @todo Be aware of possible changes to parameter order as versions change.
      */
-    protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
+    protected AMQShortString declareQueue(final AMQDestination amqd,
                                           final boolean noLocal) throws AMQException
     {
-        return declareQueue(amqd, protocolHandler, noLocal, false);
+        return declareQueue(amqd, noLocal, false);
     }
 
-    protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
+    protected AMQShortString declareQueue(final AMQDestination amqd,
                                           final boolean noLocal, final boolean nowait)
+                throws AMQException
+    {
+        return declareQueue(amqd, noLocal, nowait, false);
+    }
+
+    protected AMQShortString declareQueue(final AMQDestination amqd,
+                                          final boolean noLocal, final boolean nowait, final boolean passive)
             throws AMQException
     {
-        /*return new FailoverRetrySupport<AMQShortString, AMQException>(*/
+        final AMQProtocolHandler protocolHandler = getProtocolHandler();
         return new FailoverNoopSupport<AMQShortString, AMQException>(
                 new FailoverProtectedOperation<AMQShortString, AMQException>()
                 {
@@ -2744,7 +2773,7 @@ public abstract class AMQSession<C exten
                             amqd.setQueueName(protocolHandler.generateQueueName());
                         }
 
-                        sendQueueDeclare(amqd, protocolHandler, nowait);
+                        sendQueueDeclare(amqd, protocolHandler, nowait, passive);
 
                         return amqd.getAMQQueueName();
                     }
@@ -2752,7 +2781,7 @@ public abstract class AMQSession<C exten
     }
 
     public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
-                                          final boolean nowait) throws AMQException, FailoverException;
+                                          final boolean nowait, boolean passive) throws AMQException, FailoverException;
 
     /**
      * Undeclares the specified queue.
@@ -2882,18 +2911,18 @@ public abstract class AMQSession<C exten
 
         if (amqd.getDestSyntax() == DestSyntax.ADDR)
         {
-            handleAddressBasedDestination(amqd,true,nowait);            
+            handleAddressBasedDestination(amqd,true,consumer.isNoLocal(),nowait);
         }
         else
         {
-            if (DECLARE_EXCHANGES)
+            if (_declareExchanges)
             {
                 declareExchange(amqd, protocolHandler, nowait);
             }
     
-            if (DECLARE_QUEUES || amqd.isNameRequired())
+            if (_delareQueues || amqd.isNameRequired())
             {
-                declareQueue(amqd, protocolHandler, consumer.isNoLocal(), nowait);
+                declareQueue(amqd, consumer.isNoLocal(), nowait);
             }
             bindQueue(amqd.getAMQQueueName(), amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd, nowait);
         }
@@ -2916,24 +2945,24 @@ public abstract class AMQSession<C exten
                     try
                     {
                         suspendChannel(true);
-                        _logger.info(
+                        _logger.debug(
                                 "Prefetching delayed existing messages will not flow until requested via receive*() or setML().");
                     }
                     catch (AMQException e)
                     {
-                        _logger.info("Suspending channel threw an exception:" + e);
+                        _logger.info("Suspending channel threw an exception:", e);
                     }
                 }
             }
         }
         else
         {
-            _logger.info("Immediately prefetching existing messages to new consumer.");
+            _logger.debug("Immediately prefetching existing messages to new consumer.");
         }
 
         try
         {
-            consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelectorFilter());
+            consumeFromQueue(consumer, queueName, protocolHandler, nowait);
         }
         catch (FailoverException e)
         {
@@ -2943,6 +2972,7 @@ public abstract class AMQSession<C exten
 
     public abstract void handleAddressBasedDestination(AMQDestination dest, 
                                                        boolean isConsumer,
+                                                       boolean noLocal,
                                                        boolean noWait) throws AMQException;
     
     private void registerProducer(long producerId, MessageProducer producer)
@@ -2959,18 +2989,18 @@ public abstract class AMQSession<C exten
     private void rejectMessagesForConsumerTag(int consumerTag, boolean requeue, boolean rejectAllConsumers)
     {
         Iterator messages = _queue.iterator();
-        if (_logger.isInfoEnabled())
+        if (_logger.isDebugEnabled())
         {
-            _logger.info("Rejecting messages from _queue for Consumer tag(" + consumerTag + ") (PDispatchQ) requeue:"
+            _logger.debug("Rejecting messages from _queue for Consumer tag(" + consumerTag + ") (PDispatchQ) requeue:"
                          + requeue);
 
             if (messages.hasNext())
             {
-                _logger.info("Checking all messages in _queue for Consumer tag(" + consumerTag + ")");
+                _logger.debug("Checking all messages in _queue for Consumer tag(" + consumerTag + ")");
             }
             else
             {
-                _logger.info("No messages in _queue to reject");
+                _logger.debug("No messages in _queue to reject");
             }
         }
         while (messages.hasNext())
@@ -3013,7 +3043,7 @@ public abstract class AMQSession<C exten
     private void resubscribeProducers() throws AMQException
     {
         ArrayList producers = new ArrayList(_producers.values());
-        _logger.info(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: removeKey
+        _logger.debug(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: removeKey
         for (Iterator it = producers.iterator(); it.hasNext();)
         {
             P producer = (P) it.next();
@@ -3103,7 +3133,10 @@ public abstract class AMQSession<C exten
     public void setFlowControl(final boolean active)
     {
         _flowControl.setFlowControl(active);
-        _logger.warn("Broker enforced flow control " + (active ? "no longer in effect" : "has been enforced"));
+        if (_logger.isInfoEnabled())
+        {
+            _logger.info("Broker enforced flow control " + (active ? "no longer in effect" : "has been enforced"));
+        }
     }
 
     public void checkFlowControl() throws InterruptedException, JMSException
@@ -3112,17 +3145,20 @@ public abstract class AMQSession<C exten
         synchronized (_flowControl)
         {
             while (!_flowControl.getFlowControl() &&
-                   (expiryTime == 0L ? (expiryTime = System.currentTimeMillis() + FLOW_CONTROL_WAIT_FAILURE)
+                   (expiryTime == 0L ? (expiryTime = System.currentTimeMillis() + _flowControlWaitFailure)
                                      : expiryTime) >= System.currentTimeMillis() )
             {
 
-                _flowControl.wait(FLOW_CONTROL_WAIT_PERIOD);
-                _logger.warn("Message send delayed by " + (System.currentTimeMillis() + FLOW_CONTROL_WAIT_FAILURE - expiryTime)/1000 + "s due to broker enforced flow control");
+                _flowControl.wait(_flowControlWaitPeriod);
+                if (_logger.isInfoEnabled())
+                {
+                    _logger.info("Message send delayed by " + (System.currentTimeMillis() + _flowControlWaitFailure - expiryTime)/1000 + "s due to broker enforced flow control");
+                }
             }
             if(!_flowControl.getFlowControl())
             {
                 _logger.error("Message send failed due to timeout waiting on broker enforced flow control");
-                throw new JMSException("Unable to send message for " + FLOW_CONTROL_WAIT_FAILURE/1000 + " seconds due to broker enforced flow control");
+                throw new JMSException("Unable to send message for " + _flowControlWaitFailure /1000 + " seconds due to broker enforced flow control");
             }
         }
 
@@ -3154,7 +3190,7 @@ public abstract class AMQSession<C exten
         private final AtomicBoolean _closed = new AtomicBoolean(false);
 
         private final Object _lock = new Object();
-        private String dispatcherID = "" + System.identityHashCode(this);
+        private final String dispatcherID = "" + System.identityHashCode(this);
 
         public Dispatcher()
         {
@@ -3169,6 +3205,11 @@ public abstract class AMQSession<C exten
 
         }
 
+        private AtomicBoolean getClosed()
+        {
+            return _closed;
+        }
+
         public void rejectPending(C consumer)
         {
             synchronized (_lock)
@@ -3220,7 +3261,6 @@ public abstract class AMQSession<C exten
                     else
                     {
                         // should perhaps clear the _SQ here.
-                        // consumer._synchronousQueue.clear();
                         consumer.clearReceiveQueue();
                     }
 
@@ -3266,13 +3306,11 @@ public abstract class AMQSession<C exten
         
         public void run()
         {
-            if (_dispatcherLogger.isInfoEnabled())
+            if (_dispatcherLogger.isDebugEnabled())
             {
-                _dispatcherLogger.info(_dispatcherThread.getName() + " started");
+                _dispatcherLogger.debug(_dispatcherThread.getName() + " started");
             }
 
-            UnprocessedMessage message;
-
             // Allow disptacher to start stopped
             synchronized (_lock)
             {
@@ -3284,7 +3322,7 @@ public abstract class AMQSession<C exten
                     }
                     catch (InterruptedException e)
                     {
-                        // ignore
+                        Thread.currentThread().interrupt();
                     }
                 }
             }
@@ -3299,12 +3337,12 @@ public abstract class AMQSession<C exten
             }
             catch (InterruptedException e)
             {
-                // ignore
+                // ignored as run will exit immediately
             }
 
-            if (_dispatcherLogger.isInfoEnabled())
+            if (_dispatcherLogger.isDebugEnabled())
             {
-                _dispatcherLogger.info(_dispatcherThread.getName() + " thread terminating for channel " + _channelId + ":" + _thisSession);
+                _dispatcherLogger.debug(_dispatcherThread.getName() + " thread terminating for channel " + _channelId + ":" + AMQSession.this);
             }
 
         }
@@ -3350,7 +3388,7 @@ public abstract class AMQSession<C exten
                 }
                 catch (InterruptedException e)
                 {
-                    // pass
+                    Thread.currentThread().interrupt();
                 }
 
                 if (!(message instanceof CloseConsumerMessage)
@@ -3425,7 +3463,7 @@ public abstract class AMQSession<C exten
                     if (_logger.isDebugEnabled())
                     {
                         _logger.debug("Rejecting message with delivery tag " + message.getDeliveryTag()
-                                + " for closing consumer " + String.valueOf(consumer == null? null: consumer._consumerTag));
+                                + " for closing consumer " + String.valueOf(consumer == null? null: consumer.getConsumerTag()));
                     }
                     rejectMessage(message, true);
                 }
@@ -3443,30 +3481,6 @@ public abstract class AMQSession<C exten
 
     public abstract AMQMessageDelegateFactory getMessageDelegateFactory();
 
-    /*public void requestAccess(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write,
-        boolean read) throws AMQException
-    {
-        getProtocolHandler().writeCommandFrameAndWaitForReply(AccessRequestBody.createAMQFrame(getChannelId(),
-                getProtocolMajorVersion(), getProtocolMinorVersion(), active, exclusive, passive, read, realm, write),
-            new BlockingMethodFrameListener(_channelId)
-            {
-
-                public boolean processMethod(int channelId, AMQMethodBody frame) // throws AMQException
-                {
-                    if (frame instanceof AccessRequestOkBody)
-                    {
-                        setTicket(((AccessRequestOkBody) frame).getTicket());
-
-                        return true;
-                    }
-                    else
-                    {
-                        return false;
-                    }
-                }
-            });
-    }*/
-
     private class SuspenderRunner implements Runnable
     {
         private AtomicBoolean _suspend;
@@ -3484,7 +3498,7 @@ public abstract class AMQSession<C exten
                 {
                     // If the session has closed by the time we get here
                     // then we should not attempt to write to the sesion/channel.
-                    if (!(_thisSession.isClosed() || _thisSession.isClosing()))
+                    if (!(AMQSession.this.isClosed() || AMQSession.this.isClosing()))
                     {
                         suspendChannel(_suspend.get());
                     }
@@ -3492,11 +3506,11 @@ public abstract class AMQSession<C exten
             }
             catch (AMQException e)
             {
-                _logger.warn("Unable to " + (_suspend.get() ? "suspend" : "unsuspend") + " session " + _thisSession + " due to: " + e);
+                _logger.warn("Unable to " + (_suspend.get() ? "suspend" : "unsuspend") + " session " + AMQSession.this + " due to: ", e);
                 if (_logger.isDebugEnabled())
                 {
                     _logger.debug("Is the _queue empty?" + _queue.isEmpty());
-                    _logger.debug("Is the dispatcher closed?" + (_dispatcher == null ? "it's Null" : _dispatcher._closed));
+                    _logger.debug("Is the dispatcher closed?" + (_dispatcher == null ? "it's Null" : _dispatcher.getClosed()));
                 }
             }
         }
@@ -3510,7 +3524,7 @@ public abstract class AMQSession<C exten
     @Override
     public boolean isClosed()
     {
-        return _closed.get() || _connection.isClosed();
+        return super.isClosed() || _connection.isClosed();
     }
 
     /**
@@ -3522,12 +3536,12 @@ public abstract class AMQSession<C exten
     @Override
     public boolean isClosing()
     {
-        return _closing.get()|| _connection.isClosing();
+        return super.isClosing() || _connection.isClosing();
     }
     
     public boolean isDeclareExchanges()
     {
-    	return DECLARE_EXCHANGES;
+    	return _declareExchanges;
     }
 
     JMSException toJMSException(String message, TransportException e)

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSessionAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSessionAdapter.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSessionAdapter.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSessionAdapter.java Sat Mar 10 19:22:10 2012
@@ -20,7 +20,172 @@
  */
 package org.apache.qpid.client;
 
-public interface AMQSessionAdapter
+import javax.jms.*;
+import java.io.Serializable;
+
+public abstract class AMQSessionAdapter<T extends Session> implements Session
 {
-    public AMQSession getSession();
+    private final T _session;
+
+    protected AMQSessionAdapter(final T session)
+    {
+        _session = session;
+    }
+
+    public T getSession()
+    {
+        return _session;
+    }
+
+    public BytesMessage createBytesMessage() throws JMSException
+    {
+        return _session.createBytesMessage();
+    }
+
+    public MapMessage createMapMessage() throws JMSException
+    {
+        return _session.createMapMessage();
+    }
+
+    public Message createMessage() throws JMSException
+    {
+        return _session.createMessage();
+    }
+
+    public ObjectMessage createObjectMessage() throws JMSException
+    {
+        return _session.createObjectMessage();
+    }
+
+    public ObjectMessage createObjectMessage(final Serializable serializable) throws JMSException
+    {
+        return _session.createObjectMessage(serializable);
+    }
+
+    public StreamMessage createStreamMessage() throws JMSException
+    {
+        return _session.createStreamMessage();
+    }
+
+    public TextMessage createTextMessage() throws JMSException
+    {
+        return _session.createTextMessage();
+    }
+
+    public TextMessage createTextMessage(final String s) throws JMSException
+    {
+        return _session.createTextMessage(s);
+    }
+
+    public boolean getTransacted() throws JMSException
+    {
+        return _session.getTransacted();
+    }
+
+    public int getAcknowledgeMode() throws JMSException
+    {
+        return _session.getAcknowledgeMode();
+    }
+
+    public void commit() throws JMSException
+    {
+        _session.commit();
+    }
+
+    public void rollback() throws JMSException
+    {
+        _session.rollback();
+    }
+
+    public void close() throws JMSException
+    {
+        _session.close();
+    }
+
+    public void recover() throws JMSException
+    {
+        _session.recover();
+    }
+
+    public MessageListener getMessageListener() throws JMSException
+    {
+        return _session.getMessageListener();
+    }
+
+    public void setMessageListener(final MessageListener messageListener) throws JMSException
+    {
+        _session.setMessageListener(messageListener);
+    }
+
+    public void run()
+    {
+        _session.run();
+    }
+
+    public MessageProducer createProducer(final Destination destination) throws JMSException
+    {
+        return _session.createProducer(destination);
+    }
+
+    public MessageConsumer createConsumer(final Destination destination) throws JMSException
+    {
+        return _session.createConsumer(destination);
+    }
+
+    public MessageConsumer createConsumer(final Destination destination, final String s) throws JMSException
+    {
+        return _session.createConsumer(destination, s);
+    }
+
+    public MessageConsumer createConsumer(final Destination destination, final String s, final boolean b)
+            throws JMSException
+    {
+        return _session.createConsumer(destination, s, b);
+    }
+
+    public Queue createQueue(final String s) throws JMSException
+    {
+        return _session.createQueue(s);
+    }
+
+    public Topic createTopic(final String s) throws JMSException
+    {
+        return _session.createTopic(s);
+    }
+
+    public TopicSubscriber createDurableSubscriber(final Topic topic, final String s) throws JMSException
+    {
+        return _session.createDurableSubscriber(topic, s);
+    }
+
+    public TopicSubscriber createDurableSubscriber(final Topic topic, final String s, final String s1, final boolean b)
+            throws JMSException
+    {
+        return _session.createDurableSubscriber(topic, s, s1, b);
+    }
+
+    public QueueBrowser createBrowser(final Queue queue) throws JMSException
+    {
+        return _session.createBrowser(queue);
+    }
+
+    public QueueBrowser createBrowser(final Queue queue, final String s) throws JMSException
+    {
+        return _session.createBrowser(queue, s);
+    }
+
+    public TemporaryQueue createTemporaryQueue() throws JMSException
+    {
+        return _session.createTemporaryQueue();
+    }
+
+    public TemporaryTopic createTemporaryTopic() throws JMSException
+    {
+        return _session.createTemporaryTopic();
+    }
+
+    public void unsubscribe(final String s) throws JMSException
+    {
+        _session.unsubscribe(s);
+    }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Sat Mar 10 19:22:10 2012
@@ -17,11 +17,6 @@
  */
 package org.apache.qpid.client;
 
-import static org.apache.qpid.transport.Option.BATCH;
-import static org.apache.qpid.transport.Option.NONE;
-import static org.apache.qpid.transport.Option.SYNC;
-import static org.apache.qpid.transport.Option.UNRELIABLE;
-
 import java.lang.ref.WeakReference;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -34,10 +29,8 @@ import java.util.Timer;
 import java.util.TimerTask;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
-
 import javax.jms.Destination;
 import javax.jms.JMSException;
-
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.AMQDestination.AddressOption;
 import org.apache.qpid.client.AMQDestination.Binding;
@@ -55,11 +48,14 @@ import org.apache.qpid.client.messaging.
 import org.apache.qpid.client.messaging.address.Node.QueueNode;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.filter.MessageFilter;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.transport.*;
+import static org.apache.qpid.transport.Option.BATCH;
+import static org.apache.qpid.transport.Option.NONE;
+import static org.apache.qpid.transport.Option.SYNC;
+import static org.apache.qpid.transport.Option.UNRELIABLE;
 import org.apache.qpid.util.Serial;
 import org.apache.qpid.util.Strings;
 import org.slf4j.Logger;
@@ -78,6 +74,7 @@ public class AMQSession_0_10 extends AMQ
     private static final Logger _logger = LoggerFactory.getLogger(AMQSession_0_10.class);
 
     private static Timer timer = new Timer("ack-flusher", true);
+
     private static class Flusher extends TimerTask
     {
 
@@ -120,7 +117,7 @@ public class AMQSession_0_10 extends AMQ
     private AMQException _currentException;
 
     // a ref on the qpid connection
-    protected org.apache.qpid.transport.Connection _qpidConnection;
+    private org.apache.qpid.transport.Connection _qpidConnection;
 
     private long maxAckDelay = Long.getLong("qpid.session.max_ack_delay", 1000);
     private TimerTask flushTask = null;
@@ -163,7 +160,7 @@ public class AMQSession_0_10 extends AMQ
             _qpidSession = _qpidConnection.createSession(name,1);
         }
         _qpidSession.setSessionListener(this);
-        if (_transacted)
+        if (isTransacted())
         {
             _qpidSession.txSelect();
             _qpidSession.setTransacted(true);
@@ -214,6 +211,11 @@ public class AMQSession_0_10 extends AMQ
         }
     }
 
+    protected Connection getQpidConnection()
+    {
+        return _qpidConnection;
+    }
+
     //------- overwritten methods of class AMQSession
 
     void failoverPrep()
@@ -234,17 +236,17 @@ public class AMQSession_0_10 extends AMQ
     {
         if (_logger.isDebugEnabled())
         {
-            _logger.debug("Sending ack for delivery tag " + deliveryTag + " on session " + _channelId);
+            _logger.debug("Sending ack for delivery tag " + deliveryTag + " on session " + getChannelId());
         }
         // acknowledge this message
         if (multiple)
         {
-            for (Long messageTag : _unacknowledgedMessageTags)
+            for (Long messageTag : getUnacknowledgedMessageTags())
             {
                 if( messageTag <= deliveryTag )
                 {
                     addUnacked(messageTag.intValue());
-                    _unacknowledgedMessageTags.remove(messageTag);
+                    getUnacknowledgedMessageTags().remove(messageTag);
                 }
             }
             //empty the list of unack messages
@@ -253,12 +255,12 @@ public class AMQSession_0_10 extends AMQ
         else
         {
             addUnacked((int) deliveryTag);
-            _unacknowledgedMessageTags.remove(deliveryTag);
+            getUnacknowledgedMessageTags().remove(deliveryTag);
         }
 
         long prefetch = getAMQConnection().getMaxPrefetch();
 
-        if (unackedCount >= prefetch/2 || maxAckDelay <= 0 || _acknowledgeMode == javax.jms.Session.AUTO_ACKNOWLEDGE)
+        if (unackedCount >= prefetch/2 || maxAckDelay <= 0 || getAcknowledgeMode() == javax.jms.Session.AUTO_ACKNOWLEDGE)
         {
             flushAcknowledgments();
         }
@@ -276,7 +278,7 @@ public class AMQSession_0_10 extends AMQ
             if (unackedCount > 0)
             {
                 messageAcknowledge
-                    (unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE,setSyncBit);
+                    (unacked, getAcknowledgeMode() != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE,setSyncBit);
                 clearUnacked();
             }
         }
@@ -444,8 +446,8 @@ public class AMQSession_0_10 extends AMQ
     {
         // release all unacked messages
         RangeSet all = RangeSetFactory.createRangeSet();
-        RangeSet delivered = gatherRangeSet(_unacknowledgedMessageTags);
-        RangeSet prefetched = gatherRangeSet(_prefetchedMessageTags);
+        RangeSet delivered = gatherRangeSet(getUnacknowledgedMessageTags());
+        RangeSet prefetched = gatherRangeSet(getPrefetchedMessageTags());
         for (Iterator<Range> deliveredIter = delivered.iterator(); deliveredIter.hasNext();)
         {
             Range range = deliveredIter.next();
@@ -526,9 +528,9 @@ public class AMQSession_0_10 extends AMQ
     {
 
         final AMQProtocolHandler protocolHandler = getProtocolHandler();
-        return new BasicMessageConsumer_0_10(_channelId, _connection, destination, messageSelector, noLocal,
-                                             _messageFactoryRegistry, this, protocolHandler, rawSelector, prefetchHigh,
-                                             prefetchLow, exclusive, _acknowledgeMode, noConsume, autoClose);
+        return new BasicMessageConsumer_0_10(getChannelId(), getAMQConnection(), destination, messageSelector, noLocal,
+                getMessageFactoryRegistry(), this, protocolHandler, rawSelector, prefetchHigh,
+                                             prefetchLow, exclusive, getAcknowledgeMode(), noConsume, autoClose);
     }
 
     /**
@@ -593,7 +595,7 @@ public class AMQSession_0_10 extends AMQ
      * Registers the consumer with the broker
      */
     public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler,
-                            boolean nowait, MessageFilter messageSelector, int tag)
+                            boolean nowait, int tag)
             throws AMQException, FailoverException
     {        
         boolean preAcquire = consumer.isPreAcquire();
@@ -630,7 +632,7 @@ public class AMQSession_0_10 extends AMQ
         getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF,
                                      Option.UNRELIABLE);
 
-        if(capacity > 0 && _dispatcher != null && (isStarted() || _immediatePrefetch))
+        if(capacity > 0 && getDispatcher() != null && (isStarted() || isImmediatePrefetch()))
         {
             // set the flow
             getQpidSession().messageFlow(consumerTag,
@@ -648,12 +650,12 @@ public class AMQSession_0_10 extends AMQ
     /**
      * Create an 0_10 message producer
      */
-    public BasicMessageProducer_0_10 createMessageProducer(final Destination destination, final boolean mandatory,
-                                                      final boolean immediate, final long producerId) throws JMSException
+    public BasicMessageProducer_0_10 createMessageProducer(final Destination destination, final Boolean mandatory,
+                                                           final Boolean immediate, final long producerId) throws JMSException
     {
         try
         {
-            return new BasicMessageProducer_0_10(_connection, (AMQDestination) destination, _transacted, _channelId, this,
+            return new BasicMessageProducer_0_10(getAMQConnection(), (AMQDestination) destination, isTransacted(), getChannelId(), this,
                                              getProtocolHandler(), producerId, immediate, mandatory);
         }
         catch (AMQException e)
@@ -719,7 +721,7 @@ public class AMQSession_0_10 extends AMQ
      * Declare a queue with the given queueName
      */
     public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
-                                 final boolean nowait)
+                                 final boolean nowait, boolean passive)
             throws AMQException, FailoverException
     {
         // do nothing this is only used by 0_8
@@ -729,7 +731,7 @@ public class AMQSession_0_10 extends AMQ
      * Declare a queue with the given queueName
      */
     public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
-                                               final boolean noLocal, final boolean nowait)
+                                               final boolean noLocal, final boolean nowait, boolean passive)
             throws AMQException
     {
         AMQShortString queueName;
@@ -755,13 +757,20 @@ public class AMQSession_0_10 extends AMQ
             getQpidSession().queueDeclare(queueName.toString(), "" , arguments,
                                           amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
                                           amqd.isDurable() ? Option.DURABLE : Option.NONE,
-                                          amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE);   
+                                          amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE,
+                                          passive ? Option.PASSIVE : Option.NONE);
         }
         else
         {
             QueueNode node = (QueueNode)amqd.getSourceNode();
+            Map<String,Object> arguments = new HashMap<String,Object>();
+            arguments.putAll((Map<? extends String, ? extends Object>) node.getDeclareArgs());
+            if (arguments == null || arguments.get(AddressHelper.NO_LOCAL) == null)
+            {
+                arguments.put(AddressHelper.NO_LOCAL, noLocal);
+            }
             getQpidSession().queueDeclare(queueName.toString(), node.getAlternateExchange() ,
-                    node.getDeclareArgs(),
+                    arguments,
                     node.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
                     node.isDurable() ? Option.DURABLE : Option.NONE,
                     node.isExclusive() ? Option.EXCLUSIVE : Option.NONE);   
@@ -795,15 +804,16 @@ public class AMQSession_0_10 extends AMQ
     {
         if (suspend)
         {
-            for (BasicMessageConsumer consumer : _consumers.values())
+            for (BasicMessageConsumer consumer : getConsumers().values())
             {
                 getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()),
                                              Option.UNRELIABLE);
             }
+            sync();
         }
         else
         {
-            for (BasicMessageConsumer_0_10 consumer : _consumers.values())
+            for (BasicMessageConsumer_0_10 consumer : getConsumers().values())
             {
                 String consumerTag = String.valueOf(consumer.getConsumerTag());
                 //only set if msg list is null
@@ -918,11 +928,12 @@ public class AMQSession_0_10 extends AMQ
         return getCurrentException();
     }
 
-    protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
-                                          final boolean noLocal, final boolean nowait)
+    protected AMQShortString declareQueue(final AMQDestination amqd,
+                                          final boolean noLocal, final boolean nowait, final boolean passive)
             throws AMQException
     {
-        /*return new FailoverRetrySupport<AMQShortString, AMQException>(*/
+        final AMQProtocolHandler protocolHandler = getProtocolHandler();
+
         return new FailoverNoopSupport<AMQShortString, AMQException>(
                 new FailoverProtectedOperation<AMQShortString, AMQException>()
                 {
@@ -939,14 +950,18 @@ public class AMQSession_0_10 extends AMQ
                             amqd.setQueueName(new AMQShortString( binddingKey + "@"
                                     + amqd.getExchangeName().toString() + "_" + UUID.randomUUID()));
                         }
-                        return send0_10QueueDeclare(amqd, protocolHandler, noLocal, nowait);
+                        return send0_10QueueDeclare(amqd, protocolHandler, noLocal, nowait, passive);
                     }
-                }, _connection).execute();
+                }, getAMQConnection()).execute();
     }
 
-    protected Long requestQueueDepth(AMQDestination amqd)
+    protected Long requestQueueDepth(AMQDestination amqd, boolean sync)
     {
         flushAcknowledgments();
+        if (sync)
+        {
+            getQpidSession().sync();
+        }
         return getQpidSession().queueQuery(amqd.getQueueName()).get().getMessageCount();
     }
 
@@ -968,8 +983,8 @@ public class AMQSession_0_10 extends AMQ
     protected void sendTxCompletionsIfNecessary()
     {
         // this is a heuristic, we may want to have that configurable
-        if (_txSize > 0 && (_connection.getMaxPrefetch() == 1 ||
-                _connection.getMaxPrefetch() != 0 && _txSize % (_connection.getMaxPrefetch() / 2) == 0))
+        if (_txSize > 0 && (getAMQConnection().getMaxPrefetch() == 1 ||
+                getAMQConnection().getMaxPrefetch() != 0 && _txSize % (getAMQConnection().getMaxPrefetch() / 2) == 0))
         {
             // send completed so consumer credits don't dry up
             messageAcknowledge(_txRangeSet, false);
@@ -1039,7 +1054,7 @@ public class AMQSession_0_10 extends AMQ
             AMQException amqe = new AMQException(AMQConstant.getConstant(code), se.getMessage(), se.getCause());
             _currentException = amqe;
         }
-        _connection.exceptionReceived(_currentException);
+        getAMQConnection().exceptionReceived(_currentException);
     }
 
     public AMQMessageDelegateFactory getMessageDelegateFactory()
@@ -1156,13 +1171,14 @@ public class AMQSession_0_10 extends AMQ
     @SuppressWarnings("deprecation")
     public void handleAddressBasedDestination(AMQDestination dest, 
                                               boolean isConsumer,
+                                              boolean noLocal,
                                               boolean noWait) throws AMQException
     {
-        if (dest.isAddressResolved() && dest.isResolvedAfter(_connection.getLastFailoverTime()))
+        if (dest.isAddressResolved() && dest.isResolvedAfter(getAMQConnection().getLastFailoverTime()))
         {
             if (isConsumer && AMQDestination.TOPIC_TYPE == dest.getAddressType()) 
             {
-                createSubscriptionQueue(dest);
+                createSubscriptionQueue(dest,noLocal);
             }
         }
         else
@@ -1191,7 +1207,7 @@ public class AMQSession_0_10 extends AMQ
                     else if(createNode)
                     {
                         setLegacyFiledsForQueueType(dest);
-                        send0_10QueueDeclare(dest,null,false,noWait);
+                        send0_10QueueDeclare(dest,null,noLocal,noWait, false);
                         sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(),
                                       null,dest.getExchangeName(),dest, false);
                         break;
@@ -1206,7 +1222,7 @@ public class AMQSession_0_10 extends AMQ
                         verifySubject(dest);
                         if (isConsumer && !isQueueExist(dest,(QueueNode)dest.getSourceNode(),true)) 
                         {  
-                            createSubscriptionQueue(dest);
+                            createSubscriptionQueue(dest, noLocal);
                         }
                         break;
                     }
@@ -1221,7 +1237,7 @@ public class AMQSession_0_10 extends AMQ
                                 false);        
                         if (isConsumer && !isQueueExist(dest,(QueueNode)dest.getSourceNode(),true)) 
                         {
-                            createSubscriptionQueue(dest);
+                            createSubscriptionQueue(dest,noLocal);
                         }
                         break;
                     }
@@ -1284,7 +1300,7 @@ public class AMQSession_0_10 extends AMQ
         }
     }
     
-    private void createSubscriptionQueue(AMQDestination dest) throws AMQException
+    private void createSubscriptionQueue(AMQDestination dest, boolean noLocal) throws AMQException
     {
         QueueNode node = (QueueNode)dest.getSourceNode();  // source node is never null
         
@@ -1297,11 +1313,11 @@ public class AMQSession_0_10 extends AMQ
         }
         node.setExclusive(true);
         node.setAutoDelete(!node.isDurable());
-        send0_10QueueDeclare(dest,null,false,true);
-        node.addBinding(new Binding(dest.getAddressName(),
-                                    dest.getQueueName(),// should have one by now
-                                    dest.getSubject(),
-                                    Collections.<String,Object>emptyMap()));
+        send0_10QueueDeclare(dest,null,noLocal,true, false);
+        getQpidSession().exchangeBind(dest.getQueueName(), 
+        		              dest.getAddressName(), 
+        		              dest.getSubject(), 
+        		              Collections.<String,Object>emptyMap());
         sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(),
                 null,dest.getExchangeName(),dest, false);
     }
@@ -1328,7 +1344,7 @@ public class AMQSession_0_10 extends AMQ
     
     protected void acknowledgeImpl()
     {
-        RangeSet ranges = gatherRangeSet(_unacknowledgedMessageTags);
+        RangeSet ranges = gatherRangeSet(getUnacknowledgedMessageTags());
 
         if(ranges.size() > 0 )
         {
@@ -1344,15 +1360,53 @@ public class AMQSession_0_10 extends AMQ
         // return the first <total number of msgs received on session>
         // messages sent by the brokers following the first rollback
         // after failover
-        _highestDeliveryTag.set(-1);
+        getHighestDeliveryTag().set(-1);
         // Clear txRangeSet/unacknowledgedMessageTags so we don't complete commands corresponding to
         //messages that came from the old broker.
         _txRangeSet.clear();
         _txSize = 0;
-        _unacknowledgedMessageTags.clear();
-        _prefetchedMessageTags.clear();
+        getUnacknowledgedMessageTags().clear();
+        getPrefetchedMessageTags().clear();
         super.resubscribe();
         getQpidSession().sync();
     }
+
+    @Override
+    void stop() throws AMQException
+    {
+        super.stop();
+        setUsingDispatcherForCleanup(true);
+        drainDispatchQueue();
+        setUsingDispatcherForCleanup(false);
+
+        for (BasicMessageConsumer consumer : getConsumers().values())
+        {
+            List<Long> tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags();
+            getPrefetchedMessageTags().addAll(tags);
+        }
+        
+        RangeSet delivered = gatherRangeSet(getUnacknowledgedMessageTags());
+		RangeSet prefetched = gatherRangeSet(getPrefetchedMessageTags());
+		RangeSet all = RangeSetFactory.createRangeSet(delivered.size()
+					+ prefetched.size());
+
+		for (Iterator<Range> deliveredIter = delivered.iterator(); deliveredIter.hasNext();)
+		{
+			Range range = deliveredIter.next();
+			all.add(range);
+		}
+
+		for (Iterator<Range> prefetchedIter = prefetched.iterator(); prefetchedIter.hasNext();)
+		{
+			Range range = prefetchedIter.next();
+			all.add(range);
+		}
+
+		flushProcessed(all, false);
+		getQpidSession().messageRelease(delivered,Option.SET_REDELIVERED);
+		getQpidSession().messageRelease(prefetched);
+		sync();
+    }
+
 }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org