You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/10/17 16:23:22 UTC

svn commit: r1632583 [2/5] - in /qpid/trunk/qpid/java: ./ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/ bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/...

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1632583&r1=1632582&r2=1632583&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Fri Oct 17 14:23:19 2014
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.server.protocol.v0_8;
 
+import static org.apache.qpid.transport.util.Functions.hex;
+
 import java.nio.ByteBuffer;
 import java.security.AccessControlException;
 import java.security.PrivilegedAction;
@@ -47,14 +49,8 @@ import org.apache.log4j.Logger;
 import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.TransactionTimeoutHelper;
 import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
@@ -62,6 +58,7 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.connection.SessionPrincipal;
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.exchange.ExchangeImpl;
 import org.apache.qpid.server.filter.AMQInvalidArgumentException;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.FilterManagerFactory;
@@ -69,6 +66,7 @@ import org.apache.qpid.server.filter.Fil
 import org.apache.qpid.server.filter.MessageFilter;
 import org.apache.qpid.server.filter.SimpleFilterManager;
 import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.flow.MessageOnlyCreditManager;
 import org.apache.qpid.server.flow.Pre0_10CreditManager;
 import org.apache.qpid.server.logging.LogMessage;
 import org.apache.qpid.server.logging.LogSubject;
@@ -85,13 +83,18 @@ import org.apache.qpid.server.model.Conf
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.Consumer;
 import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.NoFactoryForTypeException;
+import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.Session;
 import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.UnknownConfiguredObjectException;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.CapacityChecker;
 import org.apache.qpid.server.protocol.ConsumerListener;
-import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueArgumentsConverter;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreFuture;
@@ -103,12 +106,18 @@ import org.apache.qpid.server.txn.LocalT
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.virtualhost.ExchangeExistsException;
+import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
+import org.apache.qpid.server.virtualhost.QueueExistsException;
+import org.apache.qpid.server.virtualhost.RequiredExchangeException;
+import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 import org.apache.qpid.transport.TransportException;
 
-public class AMQChannel<T extends AMQProtocolSession<T>>
-        implements AMQSessionModel<AMQChannel<T>,T>,
-                   AsyncAutoCommitTransaction.FutureRecorder
+public class AMQChannel
+        implements AMQSessionModel<AMQChannel, AMQProtocolEngine>,
+                   AsyncAutoCommitTransaction.FutureRecorder,
+                   ServerChannelMethodProcessor
 {
     public static final int DEFAULT_PREFETCH = 4096;
 
@@ -159,7 +168,7 @@ public class AMQChannel<T extends AMQPro
     private final AtomicLong _txnRejects = new AtomicLong(0);
     private final AtomicLong _txnCount = new AtomicLong(0);
 
-    private final T _session;
+    private final AMQProtocolEngine _connection;
     private AtomicBoolean _closing = new AtomicBoolean(false);
 
     private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>());
@@ -181,8 +190,8 @@ public class AMQChannel<T extends AMQPro
     private final TransactionTimeoutHelper _transactionTimeoutHelper;
     private final UUID _id = UUID.randomUUID();
 
-    private final List<Action<? super AMQChannel<T>>> _taskList =
-            new CopyOnWriteArrayList<Action<? super AMQChannel<T>>>();
+    private final List<Action<? super AMQChannel>> _taskList =
+            new CopyOnWriteArrayList<Action<? super AMQChannel>>();
 
 
     private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction();
@@ -194,15 +203,14 @@ public class AMQChannel<T extends AMQPro
     private Session<?> _modelObject;
 
 
-    public AMQChannel(T session, int channelId, final MessageStore messageStore)
-            throws AMQException
+    public AMQChannel(AMQProtocolEngine connection, int channelId, final MessageStore messageStore)
     {
-        _session = session;
+        _connection = connection;
         _channelId = channelId;
 
-        _subject = new Subject(false, session.getAuthorizedSubject().getPrincipals(),
-                               session.getAuthorizedSubject().getPublicCredentials(),
-                               session.getAuthorizedSubject().getPrivateCredentials());
+        _subject = new Subject(false, connection.getAuthorizedSubject().getPrincipals(),
+                               connection.getAuthorizedSubject().getPublicCredentials(),
+                               connection.getAuthorizedSubject().getPrivateCredentials());
         _subject.getPrincipals().add(new SessionPrincipal(this));
         _logSubject = new ChannelLogSubject(this);
 
@@ -211,7 +219,7 @@ public class AMQChannel<T extends AMQPro
         // by default the session is non-transactional
         _transaction = new AsyncAutoCommitTransaction(_messageStore, this);
 
-        _clientDeliveryMethod = session.createDeliveryMethod(_channelId);
+        _clientDeliveryMethod = connection.createDeliveryMethod(_channelId);
 
         _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject, new CloseAction()
         {
@@ -242,6 +250,52 @@ public class AMQChannel<T extends AMQPro
 
     }
 
+    private boolean performGet(final AMQQueue queue,
+                               final boolean acks)
+            throws MessageSource.ExistingConsumerPreventsExclusive,
+                   MessageSource.ExistingExclusiveConsumer, MessageSource.ConsumerAccessRefused
+    {
+
+        final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L);
+
+        final GetDeliveryMethod getDeliveryMethod =
+                new GetDeliveryMethod(singleMessageCredit, queue);
+        final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod()
+        {
+
+            public void recordMessageDelivery(final ConsumerImpl sub,
+                                              final MessageInstance entry,
+                                              final long deliveryTag)
+            {
+                addUnacknowledgedMessage(entry, deliveryTag, null);
+            }
+        };
+
+        ConsumerTarget_0_8 target;
+        EnumSet<ConsumerImpl.Option> options = EnumSet.of(ConsumerImpl.Option.TRANSIENT, ConsumerImpl.Option.ACQUIRES,
+                                                          ConsumerImpl.Option.SEES_REQUEUES);
+        if (acks)
+        {
+
+            target = ConsumerTarget_0_8.createAckTarget(this,
+                                                        AMQShortString.EMPTY_STRING, null,
+                                                        singleMessageCredit, getDeliveryMethod, getRecordMethod);
+        }
+        else
+        {
+            target = ConsumerTarget_0_8.createGetNoAckTarget(this,
+                                                             AMQShortString.EMPTY_STRING, null,
+                                                             singleMessageCredit, getDeliveryMethod, getRecordMethod);
+        }
+
+        ConsumerImpl sub = queue.addConsumer(target, null, AMQMessage.class, "", options);
+        sub.flush();
+        sub.close();
+        return getDeliveryMethod.hasDeliveredMessage();
+
+
+    }
+
     /** Sets this channel to be part of a local transaction */
     public void setLocalTransactional()
     {
@@ -250,7 +304,7 @@ public class AMQChannel<T extends AMQPro
             @Override
             public long getActivityTime()
             {
-                return _session.getLastReceivedTime();
+                return _connection.getLastReceivedTime();
             }
         });
         _txnStarts.incrementAndGet();
@@ -324,27 +378,18 @@ public class AMQChannel<T extends AMQPro
     }
 
     public void publishContentHeader(ContentHeaderBody contentHeaderBody)
-            throws AMQException
     {
-        if (_currentMessage == null)
+        if (_logger.isDebugEnabled())
         {
-            throw new AMQException("Received content header without previously receiving a BasicPublish frame");
+            _logger.debug("Content header received on channel " + _channelId);
         }
-        else
-        {
-            if (_logger.isDebugEnabled())
-            {
-                _logger.debug("Content header received on channel " + _channelId);
-            }
 
-            _currentMessage.setContentHeaderBody(contentHeaderBody);
+        _currentMessage.setContentHeaderBody(contentHeaderBody);
 
-            deliverCurrentMessageIfComplete();
-        }
+        deliverCurrentMessageIfComplete();
     }
 
     private void deliverCurrentMessageIfComplete()
-            throws AMQException
     {
         // check and deliver if header says body length is zero
         if (_currentMessage.allContentReceived())
@@ -355,7 +400,7 @@ public class AMQChannel<T extends AMQPro
                 final MessageMetaData messageMetaData =
                         new MessageMetaData(_currentMessage.getMessagePublishInfo(),
                                             _currentMessage.getContentHeader(),
-                                            getProtocolSession().getLastReceivedTime());
+                                            getConnection().getLastReceivedTime());
 
                 final StoredMessage<MessageMetaData> handle = _messageStore.addMessage(messageMetaData);
                 final AMQMessage amqMessage = createAMQMessage(_currentMessage, handle);
@@ -430,7 +475,7 @@ public class AMQChannel<T extends AMQPro
             {
                 long bodySize = _currentMessage.getSize();
                 long timestamp = _currentMessage.getContentHeader().getProperties().getTimestamp();
-                _session.registerMessageReceived(bodySize, timestamp);
+                _connection.registerMessageReceived(bodySize, timestamp);
                 _currentMessage = null;
             }
         }
@@ -443,13 +488,13 @@ public class AMQChannel<T extends AMQPro
      * Pre-requisite: the current message is judged to have no destination queues.
      *
      * @throws AMQConnectionException if the message is mandatory close-on-no-route
-     * @see AMQProtocolSession#isCloseWhenNoRoute()
+     * @see AMQProtocolEngine#isCloseWhenNoRoute()
      */
-    private void handleUnroutableMessage(AMQMessage message) throws AMQConnectionException
+    private void handleUnroutableMessage(AMQMessage message)
     {
         boolean mandatory = message.isMandatory();
         String description = currentMessageDescription();
-        boolean closeOnNoRoute = _session.isCloseWhenNoRoute();
+        boolean closeOnNoRoute = _connection.isCloseWhenNoRoute();
 
         if(_logger.isDebugEnabled())
         {
@@ -458,29 +503,29 @@ public class AMQChannel<T extends AMQPro
                     description, mandatory, isTransactional(), closeOnNoRoute));
         }
 
-        if (mandatory && isTransactional() && _session.isCloseWhenNoRoute())
-        {
-            throw new AMQConnectionException(
-                    AMQConstant.NO_ROUTE,
-                    "No route for message " + currentMessageDescription(),
-                    0, 0, // default class and method ids
-                    getProtocolSession().getProtocolVersion().getMajorVersion(),
-                    getProtocolSession().getProtocolVersion().getMinorVersion(),
-                    (Throwable) null);
-        }
-
-        if (mandatory || message.isImmediate())
+        if (mandatory && isTransactional() && _connection.isCloseWhenNoRoute())
         {
-            _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message " + currentMessageDescription(), message));
+            _connection.closeConnection(AMQConstant.NO_ROUTE,
+                    "No route for message " + currentMessageDescription(), _channelId);
         }
         else
         {
-            AMQShortString exchangeName = _currentMessage.getExchangeName();
-            AMQShortString routingKey = _currentMessage.getMessagePublishInfo().getRoutingKey();
+            if (mandatory || message.isImmediate())
+            {
+                _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE,
+                                                                            "No Route for message "
+                                                                            + currentMessageDescription(),
+                                                                            message));
+            }
+            else
+            {
+                AMQShortString exchangeName = _currentMessage.getExchangeName();
+                AMQShortString routingKey = _currentMessage.getMessagePublishInfo().getRoutingKey();
 
-            getVirtualHost().getEventLogger().message(
-                    ExchangeMessages.DISCARDMSG(exchangeName == null ? null : exchangeName.asString(),
-                                                routingKey == null ? null : routingKey.asString()));
+                getVirtualHost().getEventLogger().message(
+                        ExchangeMessages.DISCARDMSG(exchangeName == null ? null : exchangeName.asString(),
+                                                    routingKey == null ? null : routingKey.asString()));
+            }
         }
     }
 
@@ -499,13 +544,8 @@ public class AMQChannel<T extends AMQPro
                         : _currentMessage.getMessagePublishInfo().getRoutingKey().toString());
     }
 
-    public void publishContentBody(ContentBody contentBody) throws AMQException
+    public void publishContentBody(ContentBody contentBody)
     {
-        if (_currentMessage == null)
-        {
-            throw new AMQException("Received content body without previously receiving a Content Header");
-        }
-
         if (_logger.isDebugEnabled())
         {
             _logger.debug(debugIdentity() + " content body received on channel " + _channelId);
@@ -517,13 +557,6 @@ public class AMQChannel<T extends AMQPro
 
             deliverCurrentMessageIfComplete();
         }
-        catch (AMQException e)
-        {
-            // we want to make sure we don't keep a reference to the message in the
-            // event of an error
-            _currentMessage = null;
-            throw e;
-        }
         catch (RuntimeException e)
         {
             // we want to make sure we don't keep a reference to the message in the
@@ -566,9 +599,10 @@ public class AMQChannel<T extends AMQPro
      */
     public AMQShortString consumeFromSource(AMQShortString tag, Collection<MessageSource> sources, boolean acks,
                                             FieldTable filters, boolean exclusive, boolean noLocal)
-            throws AMQException, MessageSource.ExistingConsumerPreventsExclusive,
-                   MessageSource.ExistingExclusiveConsumer, AMQInvalidArgumentException,
-                   MessageSource.ConsumerAccessRefused
+            throws MessageSource.ExistingConsumerPreventsExclusive,
+                   MessageSource.ExistingExclusiveConsumer,
+                   AMQInvalidArgumentException,
+                   MessageSource.ConsumerAccessRefused, ConsumerTagInUseException
     {
         if (tag == null)
         {
@@ -577,7 +611,7 @@ public class AMQChannel<T extends AMQPro
 
         if (_tag2SubscriptionTargetMap.containsKey(tag))
         {
-            throw new AMQException("Consumer already exists with same tag: " + tag);
+            throw new ConsumerTagInUseException("Consumer already exists with same tag: " + tag);
         }
 
         ConsumerTarget_0_8 target;
@@ -649,27 +683,11 @@ public class AMQChannel<T extends AMQPro
                 }
             }
         }
-        catch (AccessControlException e)
-        {
-            _tag2SubscriptionTargetMap.remove(tag);
-            throw e;
-        }
-        catch (MessageSource.ExistingExclusiveConsumer e)
-        {
-            _tag2SubscriptionTargetMap.remove(tag);
-            throw e;
-        }
-        catch (MessageSource.ExistingConsumerPreventsExclusive e)
-        {
-            _tag2SubscriptionTargetMap.remove(tag);
-            throw e;
-        }
-        catch (AMQInvalidArgumentException e)
-        {
-            _tag2SubscriptionTargetMap.remove(tag);
-            throw e;
-        }
-        catch (MessageSource.ConsumerAccessRefused e)
+        catch (AccessControlException
+                | MessageSource.ExistingExclusiveConsumer
+                | MessageSource.ExistingConsumerPreventsExclusive
+                | AMQInvalidArgumentException
+                | MessageSource.ConsumerAccessRefused e)
         {
             _tag2SubscriptionTargetMap.remove(tag);
             throw e;
@@ -730,7 +748,7 @@ public class AMQChannel<T extends AMQPro
 
         unsubscribeAllConsumers();
 
-        for (Action<? super AMQChannel<T>> task : _taskList)
+        for (Action<? super AMQChannel> task : _taskList)
         {
             task.performAction(this);
         }
@@ -897,9 +915,8 @@ public class AMQChannel<T extends AMQPro
     /**
      * Called to resend all outstanding unacknowledged messages to this same channel.
      *
-     * @throws AMQException When something goes wrong.
      */
-    public void resend() throws AMQException
+    public void resend()
     {
 
 
@@ -985,9 +1002,8 @@ public class AMQChannel<T extends AMQPro
      * @param multiple    if true will acknowledge all messages up to an including the delivery tag. if false only
      *                    acknowledges the single message specified by the delivery tag
      *
-     * @throws AMQException if the delivery tag is unknown (e.g. not outstanding) on this channel
      */
-    public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException
+    public void acknowledgeMessage(long deliveryTag, boolean multiple)
     {
         Collection<MessageInstance> ackedMessages = getAckedMessages(deliveryTag, multiple);
         _transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages));
@@ -1084,22 +1100,13 @@ public class AMQChannel<T extends AMQPro
 
     public boolean isSuspended()
     {
-        return _suspended.get()  || _closing.get() || _session.isClosing();
+        return _suspended.get()  || _closing.get() || _connection.isClosing();
     }
 
-    public void commit() throws AMQException
-    {
-        commit(null, false);
-    }
 
-
-    public void commit(final Runnable immediateAction, boolean async) throws AMQException
+    public void commit(final Runnable immediateAction, boolean async)
     {
 
-        if (!isTransactional())
-        {
-            throw new AMQException("Fatal error: commit called on non-transactional channel");
-        }
 
         if(async && _transaction instanceof LocalTransaction)
         {
@@ -1132,17 +1139,8 @@ public class AMQChannel<T extends AMQPro
         }
     }
 
-    public void rollback() throws AMQException
-    {
-        rollback(NULL_TASK);
-    }
-
-    public void rollback(Runnable postRollbackTask) throws AMQException
+    public void rollback(Runnable postRollbackTask)
     {
-        if (!isTransactional())
-        {
-            throw new AMQException("Fatal error: commit called on non-transactional channel");
-        }
 
         // stop all subscriptions
         _rollingBack = true;
@@ -1200,7 +1198,7 @@ public class AMQChannel<T extends AMQPro
 
     public String toString()
     {
-        return "("+ _suspended.get() + ", " + _closing.get() + ", " + _session.isClosing() + ") "+"["+_session.toString()+":"+_channelId+"]";
+        return "("+ _suspended.get() + ", " + _closing.get() + ", " + _connection.isClosing() + ") "+"["+ _connection.toString()+":"+_channelId+"]";
     }
 
     public void setDefaultQueue(AMQQueue queue)
@@ -1219,9 +1217,9 @@ public class AMQChannel<T extends AMQPro
         return _closing.get();
     }
 
-    public AMQProtocolSession getProtocolSession()
+    public AMQProtocolEngine getConnection()
     {
-        return _session;
+        return _connection;
     }
 
     public FlowCreditManager getCreditManager()
@@ -1261,13 +1259,9 @@ public class AMQChannel<T extends AMQPro
 
 
     private AMQMessage createAMQMessage(IncomingMessage incomingMessage, StoredMessage<MessageMetaData> handle)
-            throws AMQException
     {
 
-        AMQMessage message = new AMQMessage(handle, _session.getReference());
-
-        final BasicContentHeaderProperties properties =
-                  incomingMessage.getContentHeader().getProperties();
+        AMQMessage message = new AMQMessage(handle, _connection.getReference());
 
         return message;
     }
@@ -1275,7 +1269,7 @@ public class AMQChannel<T extends AMQPro
     private boolean checkMessageUserId(ContentHeaderBody header)
     {
         AMQShortString userID = header.getProperties().getUserId();
-        return (!_messageAuthorizationRequired || _session.getAuthorizedPrincipal().getName().equals(userID == null? "" : userID.toString()));
+        return (!_messageAuthorizationRequired || _connection.getAuthorizedPrincipal().getName().equals(userID == null? "" : userID.toString()));
 
     }
 
@@ -1286,14 +1280,14 @@ public class AMQChannel<T extends AMQPro
     }
 
     @Override
-    public T getConnectionModel()
+    public AMQProtocolEngine getConnectionModel()
     {
-        return _session;
+        return _connection;
     }
 
     public String getClientID()
     {
-        return String.valueOf(_session.getContextKey());
+        return String.valueOf(_connection.getContextKey());
     }
 
     public LogSubject getLogSubject()
@@ -1308,13 +1302,13 @@ public class AMQChannel<T extends AMQPro
     }
 
     @Override
-    public void addDeleteTask(final Action<? super AMQChannel<T>> task)
+    public void addDeleteTask(final Action<? super AMQChannel> task)
     {
         _taskList.add(task);
     }
 
     @Override
-    public void removeDeleteTask(final Action<? super AMQChannel<T>> task)
+    public void removeDeleteTask(final Action<? super AMQChannel> task)
     {
         _taskList.remove(task);
     }
@@ -1324,6 +1318,46 @@ public class AMQChannel<T extends AMQPro
         return _subject;
     }
 
+    public boolean hasCurrentMessage()
+    {
+        return _currentMessage != null;
+    }
+
+    private class GetDeliveryMethod implements ClientDeliveryMethod
+    {
+
+        private final FlowCreditManager _singleMessageCredit;
+        private final AMQQueue _queue;
+        private boolean _deliveredMessage;
+
+        public GetDeliveryMethod(final FlowCreditManager singleMessageCredit,
+                                 final AMQQueue queue)
+        {
+            _singleMessageCredit = singleMessageCredit;
+            _queue = queue;
+        }
+
+        @Override
+        public long deliverToClient(final ConsumerImpl sub, final ServerMessage message,
+                                    final InstanceProperties props, final long deliveryTag)
+        {
+            _singleMessageCredit.useCreditForMessage(message.getSize());
+            long size = _connection.getProtocolOutputConverter().writeGetOk(message,
+                                                                            props,
+                                                                            AMQChannel.this.getChannelId(),
+                                                                            deliveryTag,
+                                                                            _queue.getQueueDepthMessages());
+
+            _deliveredMessage = true;
+            return size;
+        }
+
+        public boolean hasDeliveredMessage()
+        {
+            return _deliveredMessage;
+        }
+    }
+
 
     private class ImmediateAction implements Action<MessageInstance>
     {
@@ -1352,7 +1386,7 @@ public class AMQChannel<T extends AMQPro
                                     public void postCommit()
                                     {
                                         final ProtocolOutputConverter outputConverter =
-                                                    _session.getProtocolOutputConverter();
+                                                    _connection.getProtocolOutputConverter();
 
                                         outputConverter.writeReturn(message.getMessagePublishInfo(),
                                                                     message.getContentHeaderBody(),
@@ -1475,7 +1509,7 @@ public class AMQChannel<T extends AMQPro
         public void postCommit()
         {
             AMQMessage message = _reference.getMessage();
-            _session.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(),
+            _connection.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(),
                                                           message.getContentHeaderBody(),
                                                           message,
                                                           _channelId,
@@ -1544,7 +1578,7 @@ public class AMQChannel<T extends AMQPro
     @Override
     public Object getConnectionReference()
     {
-        return getProtocolSession().getReference();
+        return getConnection().getReference();
     }
 
     public int getUnacknowledgedMessageCount()
@@ -1554,9 +1588,9 @@ public class AMQChannel<T extends AMQPro
 
     private void flow(boolean flow)
     {
-        MethodRegistry methodRegistry = _session.getMethodRegistry();
+        MethodRegistry methodRegistry = _connection.getMethodRegistry();
         AMQMethodBody responseBody = methodRegistry.createChannelFlowBody(flow);
-        _session.writeFrame(responseBody.generateFrame(_channelId));
+        _connection.writeFrame(responseBody.generateFrame(_channelId));
     }
 
     @Override
@@ -1567,7 +1601,7 @@ public class AMQChannel<T extends AMQPro
 
     public VirtualHostImpl getVirtualHost()
     {
-        return getProtocolSession().getVirtualHost();
+        return getConnection().getVirtualHost();
     }
 
     public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose)
@@ -1581,11 +1615,11 @@ public class AMQChannel<T extends AMQPro
      */
     private void closeConnection(String reason) throws AMQException
     {
-        Lock receivedLock = _session.getReceivedLock();
+        Lock receivedLock = _connection.getReceivedLock();
         receivedLock.lock();
         try
         {
-            _session.close(AMQConstant.RESOURCE_ERROR, reason);
+            _connection.close(AMQConstant.RESOURCE_ERROR, reason);
         }
         finally
         {
@@ -1593,7 +1627,7 @@ public class AMQChannel<T extends AMQPro
         }
     }
 
-    public void deadLetter(long deliveryTag) throws AMQException
+    public void deadLetter(long deliveryTag)
     {
         final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap();
         final MessageInstance rejectedQueueEntry = unackedMap.remove(deliveryTag);
@@ -1816,4 +1850,1521 @@ public class AMQChannel<T extends AMQPro
             return 0L;
         }
     }
+
+    @Override
+    public void receiveAccessRequest(final AMQShortString realm,
+                                     final boolean exclusive,
+                                     final boolean passive,
+                                     final boolean active, final boolean write, final boolean read)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] AccessRequest[" +" realm: " + realm +
+                          " exclusive: " + exclusive +
+                          " passive: " + passive +
+                          " active: " + active +
+                          " write: " + write + " read: " + read + " ]");
+        }
+
+        MethodRegistry methodRegistry = _connection.getMethodRegistry();
+
+        if (ProtocolVersion.v0_91.equals(_connection.getProtocolVersion()))
+        {
+            _connection.closeConnection(AMQConstant.COMMAND_INVALID,
+                                                    "AccessRequest not present in AMQP versions other than 0-8, 0-9",
+                                                    _channelId);
+        }
+        else
+        {
+            // We don't implement access control class, but to keep clients happy that expect it
+            // always use the "0" ticket.
+            AccessRequestOkBody response = methodRegistry.createAccessRequestOkBody(0);
+            sync();
+            _connection.writeFrame(response.generateFrame(_channelId));
+        }
+    }
+
+    @Override
+    public void receiveBasicAck(final long deliveryTag, final boolean multiple)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] BasicAck[" +" deliveryTag: " + deliveryTag + " multiple: " + multiple + " ]");
+        }
+
+        acknowledgeMessage(deliveryTag, multiple);
+    }
+
+    @Override
+    public void receiveBasicCancel(final AMQShortString consumerTag, final boolean nowait)
+    {
+
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] BasicCancel[" +" consumerTag: " + consumerTag + " noWait: " + nowait + " ]");
+        }
+
+        unsubscribeConsumer(consumerTag);
+        if (!nowait)
+        {
+            MethodRegistry methodRegistry = _connection.getMethodRegistry();
+            BasicCancelOkBody cancelOkBody = methodRegistry.createBasicCancelOkBody(consumerTag);
+            sync();
+            _connection.writeFrame(cancelOkBody.generateFrame(_channelId));
+        }
+    }
+
+    @Override
+    public void receiveBasicConsume(final AMQShortString queue,
+                                    final AMQShortString consumerTag,
+                                    final boolean noLocal,
+                                    final boolean noAck,
+                                    final boolean exclusive, final boolean nowait, final FieldTable arguments)
+    {
+
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] BasicConsume[" +" queue: " + queue +
+                          " consumerTag: " + consumerTag +
+                          " noLocal: " + noLocal +
+                          " noAck: " + noAck +
+                          " exclusive: " + exclusive + " nowait: " + nowait + " arguments: " + arguments + " ]");
+        }
+
+        AMQShortString consumerTag1 = consumerTag;
+        VirtualHostImpl<?, ?, ?> vHost = _connection.getVirtualHost();
+        sync();
+        String queueName = queue == null ? null : queue.asString();
+
+        MessageSource queue1 = queueName == null ? getDefaultQueue() : vHost.getQueue(queueName);
+        final Collection<MessageSource> sources = new HashSet<>();
+        if (queue1 != null)
+        {
+            sources.add(queue1);
+        }
+        else if (vHost.getContextValue(Boolean.class, "qpid.enableMultiQueueConsumers")
+                 && arguments != null
+                 && arguments.get("x-multiqueue") instanceof Collection)
+        {
+            for (Object object : (Collection<Object>) arguments.get("x-multiqueue"))
+            {
+                String sourceName = String.valueOf(object);
+                sourceName = sourceName.trim();
+                if (sourceName.length() != 0)
+                {
+                    MessageSource source = vHost.getMessageSource(sourceName);
+                    if (source == null)
+                    {
+                        sources.clear();
+                        break;
+                    }
+                    else
+                    {
+                        sources.add(source);
+                    }
+                }
+            }
+            queueName = arguments.get("x-multiqueue").toString();
+        }
+
+        if (sources.isEmpty())
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("No queue for '" + queueName + "'");
+            }
+            if (queueName != null)
+            {
+                closeChannel(AMQConstant.NOT_FOUND, "No such queue, '" + queueName + "'");
+            }
+            else
+            {
+                _connection.closeConnection(AMQConstant.NOT_ALLOWED,
+                                            "No queue name provided, no default queue defined.", _channelId);
+            }
+        }
+        else
+        {
+            try
+            {
+                consumerTag1 = consumeFromSource(consumerTag1,
+                                                 sources,
+                                                 !noAck,
+                                                 arguments,
+                                                 exclusive,
+                                                 noLocal);
+                if (!nowait)
+                {
+                    MethodRegistry methodRegistry = _connection.getMethodRegistry();
+                    AMQMethodBody responseBody = methodRegistry.createBasicConsumeOkBody(consumerTag1);
+                    _connection.writeFrame(responseBody.generateFrame(_channelId));
+
+                }
+            }
+            catch (ConsumerTagInUseException cte)
+            {
+
+                _connection.closeConnection(AMQConstant.NOT_ALLOWED,
+                                            "Non-unique consumer tag, '" + consumerTag1
+                                            + "'", _channelId);
+            }
+            catch (AMQInvalidArgumentException ise)
+            {
+                _connection.closeConnection(AMQConstant.ARGUMENT_INVALID, ise.getMessage(), _channelId);
+
+
+            }
+            catch (AMQQueue.ExistingExclusiveConsumer e)
+            {
+                _connection.closeConnection(AMQConstant.ACCESS_REFUSED,
+                                            "Cannot subscribe to queue "
+                                                                    + queue1.getName()
+                                                                    + " as it already has an existing exclusive consumer", _channelId);
+
+            }
+            catch (AMQQueue.ExistingConsumerPreventsExclusive e)
+            {
+                _connection.closeConnection(AMQConstant.ACCESS_REFUSED,
+                                            "Cannot subscribe to queue "
+                                                                    + queue1.getName()
+                                                                    + " exclusively as it already has a consumer", _channelId);
+
+            }
+            catch (AccessControlException e)
+            {
+                _connection.closeConnection(AMQConstant.ACCESS_REFUSED, "Cannot subscribe to queue "
+                                                                    + queue1.getName()
+                                                                    + " permission denied", _channelId);
+
+            }
+            catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
+            {
+                _connection.closeConnection(AMQConstant.ACCESS_REFUSED,
+                                            "Cannot subscribe to queue "
+                                                                    + queue1.getName()
+                                                                    + " as it already has an incompatible exclusivity policy", _channelId);
+
+            }
+
+        }
+    }
+
+    @Override
+    public void receiveBasicGet(final AMQShortString queueName, final boolean noAck)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] BasicGet[" +" queue: " + queueName + " noAck: " + noAck + " ]");
+        }
+
+        VirtualHostImpl vHost = _connection.getVirtualHost();
+        sync();
+        AMQQueue queue = queueName == null ? getDefaultQueue() : vHost.getQueue(queueName.toString());
+        if (queue == null)
+        {
+            _logger.info("No queue for '" + queueName + "'");
+            if (queueName != null)
+            {
+                _connection.closeConnection(AMQConstant.NOT_FOUND, "No such queue, '" + queueName + "'", _channelId);
+
+            }
+            else
+            {
+                _connection.closeConnection(AMQConstant.NOT_ALLOWED,
+                                            "No queue name provided, no default queue defined.", _channelId);
+
+            }
+        }
+        else
+        {
+
+            try
+            {
+                if (!performGet(queue, !noAck))
+                {
+                    MethodRegistry methodRegistry = _connection.getMethodRegistry();
+
+                    BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null);
+
+                    _connection.writeFrame(responseBody.generateFrame(_channelId));
+                }
+            }
+            catch (AccessControlException e)
+            {
+                _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), _channelId);
+            }
+            catch (MessageSource.ExistingExclusiveConsumer e)
+            {
+                _connection.closeConnection(AMQConstant.NOT_ALLOWED, "Queue has an exclusive consumer", _channelId);
+            }
+            catch (MessageSource.ExistingConsumerPreventsExclusive e)
+            {
+                _connection.closeConnection(AMQConstant.INTERNAL_ERROR,
+                                            "The GET request has been evaluated as an exclusive consumer, " +
+                                        "this is likely due to a programming error in the Qpid broker", _channelId);
+            }
+            catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
+            {
+                _connection.closeConnection(AMQConstant.NOT_ALLOWED,
+                                            "Queue has an incompatible exclusivity policy", _channelId);
+            }
+        }
+    }
+
+    @Override
+    public void receiveBasicPublish(final AMQShortString exchangeName,
+                                    final AMQShortString routingKey,
+                                    final boolean mandatory,
+                                    final boolean immediate)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] BasicPublish[" +" exchange: " + exchangeName +
+                          " routingKey: " + routingKey +
+                          " mandatory: " + mandatory +
+                          " immediate: " + immediate + " ]");
+        }
+
+        VirtualHostImpl vHost = _connection.getVirtualHost();
+
+        MessageDestination destination;
+
+        if (isDefaultExchange(exchangeName))
+        {
+            destination = vHost.getDefaultDestination();
+        }
+        else
+        {
+            destination = vHost.getMessageDestination(exchangeName.toString());
+        }
+
+        // if the exchange does not exist we raise a channel exception
+        if (destination == null)
+        {
+            closeChannel(AMQConstant.NOT_FOUND, "Unknown exchange name: " + exchangeName);
+        }
+        else
+        {
+
+            MessagePublishInfo info = new MessagePublishInfo(exchangeName,
+                                                             immediate,
+                                                             mandatory,
+                                                             routingKey);
+
+            try
+            {
+                setPublishFrame(info, destination);
+            }
+            catch (AccessControlException e)
+            {
+                _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId());
+
+            }
+        }
+    }
+
+    @Override
+    public void receiveBasicQos(final long prefetchSize, final int prefetchCount, final boolean global)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] BasicQos[" +" prefetchSize: " + prefetchSize + " prefetchCount: " + prefetchCount + " global: " + global + " ]");
+        }
+
+        sync();
+        setCredit(prefetchSize, prefetchCount);
+
+        MethodRegistry methodRegistry = _connection.getMethodRegistry();
+        AMQMethodBody responseBody = methodRegistry.createBasicQosOkBody();
+        _connection.writeFrame(responseBody.generateFrame(getChannelId()));
+
+    }
+
+    @Override
+    public void receiveBasicRecover(final boolean requeue, final boolean sync)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] BasicRecover[" + " requeue: " + requeue + " sync: " + sync + " ]");
+        }
+
+        resend();
+
+        if (sync)
+        {
+            MethodRegistry methodRegistry = _connection.getMethodRegistry();
+            AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody();
+            sync();
+            _connection.writeFrame(recoverOk.generateFrame(getChannelId()));
+
+        }
+
+    }
+
+    @Override
+    public void receiveBasicReject(final long deliveryTag, final boolean requeue)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] BasicReject[" +" deliveryTag: " + deliveryTag + " requeue: " + requeue + " ]");
+        }
+
+        MessageInstance message = getUnacknowledgedMessageMap().get(deliveryTag);
+
+        if (message == null)
+        {
+            _logger.warn("Dropping reject request as message is null for tag:" + deliveryTag);
+        }
+        else
+        {
+
+            if (message.getMessage() == null)
+            {
+                _logger.warn("Message has already been purged, unable to Reject.");
+            }
+            else
+            {
+
+                if (_logger.isDebugEnabled())
+                {
+                    _logger.debug("Rejecting: DT:" + deliveryTag
+                                                             + "-" + message.getMessage() +
+                                  ": Requeue:" + requeue
+                                  +
+                                  " on channel:" + debugIdentity());
+                }
+
+                if (requeue)
+                {
+                    //this requeue represents a message rejected from the pre-dispatch queue
+                    //therefore we need to amend the delivery counter.
+                    message.decrementDeliveryCount();
+
+                    requeue(deliveryTag);
+                }
+                else
+                {
+                    // Since the Java client abuses the reject flag for requeing after rollback, we won't set reject here
+                    // as it would prevent redelivery
+                    // message.reject();
+
+                    final boolean maxDeliveryCountEnabled = isMaxDeliveryCountEnabled(deliveryTag);
+                    _logger.debug("maxDeliveryCountEnabled: "
+                                  + maxDeliveryCountEnabled
+                                  + " deliveryTag "
+                                  + deliveryTag);
+                    if (maxDeliveryCountEnabled)
+                    {
+                        final boolean deliveredTooManyTimes = isDeliveredTooManyTimes(deliveryTag);
+                        _logger.debug("deliveredTooManyTimes: "
+                                      + deliveredTooManyTimes
+                                      + " deliveryTag "
+                                      + deliveryTag);
+                        if (deliveredTooManyTimes)
+                        {
+                            deadLetter(deliveryTag);
+                        }
+                        else
+                        {
+                            //this requeue represents a message rejected because of a recover/rollback that we
+                            //are not ready to DLQ. We rely on the reject command to resend from the unacked map
+                            //and therefore need to increment the delivery counter so we cancel out the effect
+                            //of the AMQChannel#resend() decrement.
+                            message.incrementDeliveryCount();
+                        }
+                    }
+                    else
+                    {
+                        requeue(deliveryTag);
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public void receiveChannelClose(final int replyCode,
+                                    final AMQShortString replyText,
+                                    final int classId,
+                                    final int methodId)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] ChannelClose[" +" replyCode: " + replyCode + " replyText: " + replyText + " classId: " + classId + " methodId: " + methodId + " ]");
+        }
+
+
+        sync();
+        _connection.closeChannel(this);
+
+        _connection.writeFrame(new AMQFrame(getChannelId(),
+                                            _connection.getMethodRegistry().createChannelCloseOkBody()));
+    }
+
+    @Override
+    public void receiveChannelCloseOk()
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] ChannelCloseOk");
+        }
+
+        _connection.closeChannelOk(getChannelId());
+    }
+
+    @Override
+    public void receiveMessageContent(final byte[] data)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] MessageContent[" +" data: " + hex(data,_connection.getBinaryDataLimit()) + " ] ");
+        }
+
+        if(hasCurrentMessage())
+        {
+            publishContentBody(new ContentBody(data));
+        }
+        else
+        {
+            _connection.closeConnection(AMQConstant.COMMAND_INVALID,
+                                        "Attempt to send a content header without first sending a publish frame",
+                                        _channelId);
+        }
+    }
+
+    @Override
+    public void receiveMessageHeader(final BasicContentHeaderProperties properties, final long bodySize)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] MessageHeader[ properties: {" + properties + "} bodySize: " + bodySize + " ]");
+        }
+
+        if(hasCurrentMessage())
+        {
+            publishContentHeader(new ContentHeaderBody(properties, bodySize));
+        }
+        else
+        {
+            _connection.closeConnection(AMQConstant.COMMAND_INVALID,
+                                        "Attempt to send a content header without first sending a publish frame",
+                                        _channelId);
+        }
+    }
+
+    @Override
+    public boolean ignoreAllButCloseOk()
+    {
+        return _connection.ignoreAllButCloseOk() || _connection.channelAwaitingClosure(_channelId);
+    }
+
+    @Override
+    public void receiveChannelFlow(final boolean active)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] ChannelFlow[" +" active: " + active + " ]");
+        }
+
+
+        sync();
+        setSuspended(!active);
+
+        MethodRegistry methodRegistry = _connection.getMethodRegistry();
+        AMQMethodBody responseBody = methodRegistry.createChannelFlowOkBody(active);
+        _connection.writeFrame(responseBody.generateFrame(getChannelId()));
+
+    }
+
+    @Override
+    public void receiveChannelFlowOk(final boolean active)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] ChannelFlowOk[" +" active: " + active + " ]");
+        }
+
+        // TODO - should we do anything here?
+    }
+
+    @Override
+    public void receiveExchangeBound(final AMQShortString exchangeName,
+                                     final AMQShortString routingKey,
+                                     final AMQShortString queueName)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] ExchangeBound[" +" exchange: " + exchangeName + " routingKey: " +
+                          routingKey + " queue: " + queueName + " ]");
+        }
+
+        VirtualHostImpl virtualHost = _connection.getVirtualHost();
+        MethodRegistry methodRegistry = _connection.getMethodRegistry();
+
+        sync();
+
+        int replyCode;
+        String replyText;
+
+        if (isDefaultExchange(exchangeName))
+        {
+            if (routingKey == null)
+            {
+                if (queueName == null)
+                {
+                    replyCode = virtualHost.getQueues().isEmpty()
+                            ? ExchangeBoundOkBody.NO_BINDINGS
+                            : ExchangeBoundOkBody.OK;
+                    replyText = null;
+
+                }
+                else
+                {
+                    AMQQueue queue = virtualHost.getQueue(queueName.toString());
+                    if (queue == null)
+                    {
+                        replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND;
+                        replyText = "Queue '" + queueName + "' not found";
+                    }
+                    else
+                    {
+                        replyCode = ExchangeBoundOkBody.OK;
+                        replyText = null;
+                    }
+                }
+            }
+            else
+            {
+                if (queueName == null)
+                {
+                    replyCode = virtualHost.getQueue(routingKey.toString()) == null
+                            ? ExchangeBoundOkBody.NO_QUEUE_BOUND_WITH_RK
+                            : ExchangeBoundOkBody.OK;
+                    replyText = null;
+                }
+                else
+                {
+                    AMQQueue queue = virtualHost.getQueue(queueName.toString());
+                    if (queue == null)
+                    {
+
+                        replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND;
+                        replyText = "Queue '" + queueName + "' not found";
+                    }
+                    else
+                    {
+                        replyCode = queueName.equals(routingKey)
+                                ? ExchangeBoundOkBody.OK
+                                : ExchangeBoundOkBody.SPECIFIC_QUEUE_NOT_BOUND_WITH_RK;
+                        replyText = null;
+                    }
+                }
+            }
+        }
+        else
+        {
+            ExchangeImpl exchange = virtualHost.getExchange(exchangeName.toString());
+            if (exchange == null)
+            {
+
+                replyCode = ExchangeBoundOkBody.EXCHANGE_NOT_FOUND;
+                replyText = "Exchange '" + exchangeName + "' not found";
+            }
+            else if (routingKey == null)
+            {
+                if (queueName == null)
+                {
+                    if (exchange.hasBindings())
+                    {
+                        replyCode = ExchangeBoundOkBody.OK;
+                        replyText = null;
+                    }
+                    else
+                    {
+                        replyCode = ExchangeBoundOkBody.NO_BINDINGS;
+                        replyText = null;
+                    }
+                }
+                else
+                {
+
+                    AMQQueue queue = virtualHost.getQueue(queueName.toString());
+                    if (queue == null)
+                    {
+                        replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND;
+                        replyText = "Queue '" + queueName + "' not found";
+                    }
+                    else
+                    {
+                        if (exchange.isBound(queue))
+                        {
+                            replyCode = ExchangeBoundOkBody.OK;
+                            replyText = null;
+                        }
+                        else
+                        {
+                            replyCode = ExchangeBoundOkBody.QUEUE_NOT_BOUND;
+                            replyText = "Queue '"
+                                        + queueName
+                                        + "' not bound to exchange '"
+                                        + exchangeName
+                                        + "'";
+                        }
+                    }
+                }
+            }
+            else if (queueName != null)
+            {
+                AMQQueue queue = virtualHost.getQueue(queueName.toString());
+                if (queue == null)
+                {
+                    replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND;
+                    replyText = "Queue '" + queueName + "' not found";
+                }
+                else
+                {
+                    String bindingKey = routingKey == null ? null : routingKey.asString();
+                    if (exchange.isBound(bindingKey, queue))
+                    {
+
+                        replyCode = ExchangeBoundOkBody.OK;
+                        replyText = null;
+                    }
+                    else
+                    {
+                        replyCode = ExchangeBoundOkBody.SPECIFIC_QUEUE_NOT_BOUND_WITH_RK;
+                        replyText = "Queue '" + queueName + "' not bound with routing key '" +
+                                    routingKey + "' to exchange '" + exchangeName + "'";
+
+                    }
+                }
+            }
+            else
+            {
+                if (exchange.isBound(routingKey == null ? "" : routingKey.asString()))
+                {
+
+                    replyCode = ExchangeBoundOkBody.OK;
+                    replyText = null;
+                }
+                else
+                {
+                    replyCode = ExchangeBoundOkBody.NO_QUEUE_BOUND_WITH_RK;
+                    replyText =
+                            "No queue bound with routing key '" + routingKey + "' to exchange '" + exchangeName + "'";
+                }
+            }
+        }
+
+        ExchangeBoundOkBody exchangeBoundOkBody =
+                methodRegistry.createExchangeBoundOkBody(replyCode, AMQShortString.validValueOf(replyText));
+
+        _connection.writeFrame(exchangeBoundOkBody.generateFrame(getChannelId()));
+
+    }
+
+    @Override
+    public void receiveExchangeDeclare(final AMQShortString exchangeName,
+                                       final AMQShortString type,
+                                       final boolean passive,
+                                       final boolean durable,
+                                       final boolean autoDelete,
+                                       final boolean internal,
+                                       final boolean nowait,
+                                       final FieldTable arguments)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] ExchangeDeclare[" +" exchange: " + exchangeName +
+                          " type: " + type +
+                          " passive: " + passive +
+                          " durable: " + durable +
+                          " autoDelete: " + autoDelete +
+                          " internal: " + internal + " nowait: " + nowait + " arguments: " + arguments + " ]");
+        }
+
+        ExchangeImpl exchange;
+        VirtualHostImpl<?, ?, ?> virtualHost = _connection.getVirtualHost();
+        if (isDefaultExchange(exchangeName))
+        {
+            if (!new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_CLASS).equals(type))
+            {
+                _connection.closeConnection(AMQConstant.NOT_ALLOWED, "Attempt to redeclare default exchange: "
+                                                                 + " of type "
+                                                                 + ExchangeDefaults.DIRECT_EXCHANGE_CLASS
+                                                                 + " to " + type + ".", getChannelId());
+            }
+            else if (!nowait)
+            {
+                MethodRegistry methodRegistry = _connection.getMethodRegistry();
+                AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody();
+                sync();
+                _connection.writeFrame(responseBody.generateFrame(getChannelId()));
+            }
+
+        }
+        else
+        {
+            if (passive)
+            {
+                exchange = virtualHost.getExchange(exchangeName.toString());
+                if (exchange == null)
+                {
+                    closeChannel(AMQConstant.NOT_FOUND, "Unknown exchange: " + exchangeName);
+                }
+                else if (!(type == null || type.length() == 0) && !exchange.getType().equals(type.asString()))
+                {
+
+                    _connection.closeConnection(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: "
+                                                                         +
+                                                                         exchangeName
+                                                                         + " of type "
+                                                                         + exchange.getType()
+                                                                         + " to "
+                                                                         + type
+                                                                         + ".", getChannelId());
+                }
+                else if (!nowait)
+                {
+                    MethodRegistry methodRegistry = _connection.getMethodRegistry();
+                    AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody();
+                    sync();
+                    _connection.writeFrame(responseBody.generateFrame(getChannelId()));
+                }
+
+            }
+            else
+            {
+                try
+                {
+                    String name = exchangeName == null ? null : exchangeName.intern().toString();
+                    String typeString = type == null ? null : type.intern().toString();
+
+                    Map<String, Object> attributes = new HashMap<String, Object>();
+                    if (arguments != null)
+                    {
+                        attributes.putAll(FieldTable.convertToMap(arguments));
+                    }
+                    attributes.put(Exchange.ID, null);
+                    attributes.put(Exchange.NAME, name);
+                    attributes.put(Exchange.TYPE, typeString);
+                    attributes.put(Exchange.DURABLE, durable);
+                    attributes.put(Exchange.LIFETIME_POLICY,
+                                   autoDelete ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
+                    if (!attributes.containsKey(Exchange.ALTERNATE_EXCHANGE))
+                    {
+                        attributes.put(Exchange.ALTERNATE_EXCHANGE, null);
+                    }
+                    exchange = virtualHost.createExchange(attributes);
+
+                    if (!nowait)
+                    {
+                        MethodRegistry methodRegistry = _connection.getMethodRegistry();
+                        AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody();
+                        sync();
+                        _connection.writeFrame(responseBody.generateFrame(
+                                getChannelId()));
+                    }
+
+                }
+                catch (ReservedExchangeNameException e)
+                {
+                    _connection.closeConnection(AMQConstant.NOT_ALLOWED,
+                                                "Attempt to declare exchange: " + exchangeName +
+                                                                         " which begins with reserved prefix.", getChannelId());
+
+
+                }
+                catch (ExchangeExistsException e)
+                {
+                    exchange = e.getExistingExchange();
+                    if (!new AMQShortString(exchange.getType()).equals(type))
+                    {
+                        _connection.closeConnection(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: "
+                                                                                 + exchangeName + " of type "
+                                                                                 + exchange.getType()
+                                                                                 + " to " + type + ".", getChannelId());
+
+                    }
+                }
+                catch (NoFactoryForTypeException e)
+                {
+                    _connection.closeConnection(AMQConstant.COMMAND_INVALID, "Unknown exchange type '"
+                                                                             + e.getType()
+                                                                             + "' for exchange '"
+                                                                             + exchangeName
+                                                                             + "'", getChannelId());
+
+                }
+                catch (AccessControlException e)
+                {
+                    _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId());
+
+                }
+                catch (UnknownConfiguredObjectException e)
+                {
+                    // note - since 0-8/9/9-1 can't set the alt. exchange this exception should never occur
+                    final String message = "Unknown alternate exchange "
+                                           + (e.getName() != null
+                            ? "name: \"" + e.getName() + "\""
+                            : "id: " + e.getId());
+                    _connection.closeConnection(AMQConstant.NOT_FOUND, message, getChannelId());
+
+                }
+                catch (IllegalArgumentException e)
+                {
+                    _connection.closeConnection(AMQConstant.COMMAND_INVALID, "Error creating exchange '"
+                                                                             + exchangeName
+                                                                             + "': "
+                                                                             + e.getMessage(), getChannelId());
+
+                }
+            }
+        }
+
+    }
+
+    @Override
+    public void receiveExchangeDelete(final AMQShortString exchangeStr, final boolean ifUnused, final boolean nowait)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] ExchangeDelete[" +" exchange: " + exchangeStr + " ifUnused: " + ifUnused + " nowait: " + nowait + " ]");
+        }
+
+
+        VirtualHostImpl virtualHost = _connection.getVirtualHost();
+        sync();
+        try
+        {
+
+            if (isDefaultExchange(exchangeStr))
+            {
+                _connection.closeConnection(AMQConstant.NOT_ALLOWED,
+                                            "Default Exchange cannot be deleted", getChannelId());
+
+            }
+
+            else
+            {
+                final String exchangeName = exchangeStr.toString();
+
+                final ExchangeImpl exchange = virtualHost.getExchange(exchangeName);
+                if (exchange == null)
+                {
+                    closeChannel(AMQConstant.NOT_FOUND, "No such exchange: " + exchangeStr);
+                }
+                else
+                {
+                    virtualHost.removeExchange(exchange, !ifUnused);
+
+                    ExchangeDeleteOkBody responseBody = _connection.getMethodRegistry().createExchangeDeleteOkBody();
+
+                    _connection.writeFrame(responseBody.generateFrame(getChannelId()));
+                }
+            }
+        }
+        catch (ExchangeIsAlternateException e)
+        {
+            closeChannel(AMQConstant.NOT_ALLOWED, "Exchange in use as an alternate exchange");
+        }
+        catch (RequiredExchangeException e)
+        {
+            closeChannel(AMQConstant.NOT_ALLOWED,
+                         "Exchange '" + exchangeStr + "' cannot be deleted");
+        }
+        catch (AccessControlException e)
+        {
+            _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId());
+        }
+    }
+
+    @Override
+    public void receiveQueueBind(final AMQShortString queueName,
+                                 final AMQShortString exchange,
+                                 AMQShortString routingKey,
+                                 final boolean nowait,
+                                 final FieldTable argumentsTable)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] QueueBind[" +" queue: " + queueName +
+                          " exchange: " + exchange +
+                          " bindingKey: " + routingKey +
+                          " nowait: " + nowait + " arguments: " + argumentsTable + " ]");
+        }
+
+        VirtualHostImpl virtualHost = _connection.getVirtualHost();
+        AMQQueue<?> queue;
+        if (queueName == null)
+        {
+
+            queue = getDefaultQueue();
+
+            if (queue != null)
+            {
+                if (routingKey == null)
+                {
+                    routingKey = AMQShortString.valueOf(queue.getName());
+                }
+                else
+                {
+                    routingKey = routingKey.intern();
+                }
+            }
+        }
+        else
+        {
+            queue = virtualHost.getQueue(queueName.toString());
+            routingKey = routingKey == null ? AMQShortString.EMPTY_STRING : routingKey.intern();
+        }
+
+        if (queue == null)
+        {
+            String message = queueName == null
+                    ? "No default queue defined on channel and queue was null"
+                    : "Queue " + queueName + " does not exist.";
+                closeChannel(AMQConstant.NOT_FOUND, message);
+        }
+        else if (isDefaultExchange(exchange))
+        {
+            _connection.closeConnection(AMQConstant.NOT_ALLOWED,
+                                        "Cannot bind the queue " + queueName + " to the default exchange", getChannelId());
+
+        }
+        else
+        {
+
+            final String exchangeName = exchange.toString();
+
+            final ExchangeImpl exch = virtualHost.getExchange(exchangeName);
+            if (exch == null)
+            {
+                closeChannel(AMQConstant.NOT_FOUND,
+                             "Exchange " + exchangeName + " does not exist.");
+            }
+            else
+            {
+
+                try
+                {
+
+                    Map<String, Object> arguments = FieldTable.convertToMap(argumentsTable);
+                    String bindingKey = String.valueOf(routingKey);
+
+                    if (!exch.isBound(bindingKey, arguments, queue))
+                    {
+
+                        if (!exch.addBinding(bindingKey, queue, arguments)
+                            && ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(
+                                exch.getType()))
+                        {
+                            exch.replaceBinding(bindingKey, queue, arguments);
+                        }
+                    }
+
+                    if (_logger.isInfoEnabled())
+                    {
+                        _logger.info("Binding queue "
+                                     + queue
+                                     + " to exchange "
+                                     + exch
+                                     + " with routing key "
+                                     + routingKey);
+                    }
+                    if (!nowait)
+                    {
+                        sync();
+                        MethodRegistry methodRegistry = _connection.getMethodRegistry();
+                        AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody();
+                        _connection.writeFrame(responseBody.generateFrame(getChannelId()));
+
+                    }
+                }
+                catch (AccessControlException e)
+                {
+                    _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId());
+                }
+            }
+        }
+    }
+
+    @Override
+    public void receiveQueueDeclare(final AMQShortString queueStr,
+                                    final boolean passive,
+                                    final boolean durable,
+                                    final boolean exclusive,
+                                    final boolean autoDelete,
+                                    final boolean nowait,
+                                    final FieldTable arguments)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] QueueDeclare[" +" queue: " + queueStr +
+                          " passive: " + passive +
+                          " durable: " + durable +
+                          " exclusive: " + exclusive +
+                          " autoDelete: " + autoDelete + " nowait: " + nowait + " arguments: " + arguments + " ]");
+        }
+
+        VirtualHostImpl virtualHost = _connection.getVirtualHost();
+
+        final AMQShortString queueName;
+
+        // if we aren't given a queue name, we create one which we return to the client
+        if ((queueStr == null) || (queueStr.length() == 0))
+        {
+            queueName = new AMQShortString("tmp_" + UUID.randomUUID());
+        }
+        else
+        {
+            queueName = queueStr.intern();
+        }
+
+        AMQQueue queue;
+
+        //TODO: do we need to check that the queue already exists with exactly the same "configuration"?
+
+
+        if (passive)
+        {
+            queue = virtualHost.getQueue(queueName.toString());
+            if (queue == null)
+            {
+                closeChannel(AMQConstant.NOT_FOUND,
+                                                     "Queue: "
+                                                     + queueName
+                                                     + " not found on VirtualHost("
+                                                     + virtualHost
+                                                     + ").");
+            }
+            else
+            {
+                if (!queue.verifySessionAccess(this))
+                {
+                    _connection.closeConnection(AMQConstant.NOT_ALLOWED, "Queue "
+                                                + queue.getName()
+                                                + " is exclusive, but not created on this Connection.", getChannelId());
+                }
+                else
+                {
+                    //set this as the default queue on the channel:
+                    setDefaultQueue(queue);
+                    if (!nowait)
+                    {
+                        sync();
+                        MethodRegistry methodRegistry = _connection.getMethodRegistry();
+                        QueueDeclareOkBody responseBody =
+                                methodRegistry.createQueueDeclareOkBody(queueName,
+                                                                        queue.getQueueDepthMessages(),
+                                                                        queue.getConsumerCount());
+                        _connection.writeFrame(responseBody.generateFrame(getChannelId()));
+
+                        _logger.info("Queue " + queueName + " declared successfully");
+                    }
+                }
+            }
+        }
+        else
+        {
+
+            try
+            {
+                Map<String, Object> attributes =
+                        QueueArgumentsConverter.convertWireArgsToModel(FieldTable.convertToMap(arguments));
+                final String queueNameString = AMQShortString.toString(queueName);
+                attributes.put(Queue.NAME, queueNameString);
+                attributes.put(Queue.ID, UUID.randomUUID());
+                attributes.put(Queue.DURABLE, durable);
+
+                LifetimePolicy lifetimePolicy;
+                ExclusivityPolicy exclusivityPolicy;
+
+                if (exclusive)
+                {
+                    lifetimePolicy = autoDelete
+                            ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS
+                            : durable ? LifetimePolicy.PERMANENT : LifetimePolicy.DELETE_ON_CONNECTION_CLOSE;
+                    exclusivityPolicy = durable ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.CONNECTION;
+                }
+                else
+                {
+                    lifetimePolicy = autoDelete ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : LifetimePolicy.PERMANENT;
+                    exclusivityPolicy = ExclusivityPolicy.NONE;
+                }
+
+                attributes.put(Queue.EXCLUSIVE, exclusivityPolicy);
+                attributes.put(Queue.LIFETIME_POLICY, lifetimePolicy);
+
+
+                queue = virtualHost.createQueue(attributes);
+
+                setDefaultQueue(queue);
+
+                if (!nowait)
+                {
+                    sync();
+                    MethodRegistry methodRegistry = _connection.getMethodRegistry();
+                    QueueDeclareOkBody responseBody =
+                            methodRegistry.createQueueDeclareOkBody(queueName,
+                                                                    queue.getQueueDepthMessages(),
+                                                                    queue.getConsumerCount());
+                    _connection.writeFrame(responseBody.generateFrame(getChannelId()));
+
+                    _logger.info("Queue " + queueName + " declared successfully");
+                }
+            }
+            catch (QueueExistsException qe)
+            {
+
+                queue = qe.getExistingQueue();
+
+                if (!queue.verifySessionAccess(this))
+                {
+                    _connection.closeConnection(AMQConstant.NOT_ALLOWED, "Queue "
+                                                + queue.getName()
+                                                + " is exclusive, but not created on this Connection.", getChannelId());
+
+                }
+                else if (queue.isExclusive() != exclusive)
+                {
+
+                    closeChannel(AMQConstant.ALREADY_EXISTS,
+                                                         "Cannot re-declare queue '"
+                                                         + queue.getName()
+                                                         + "' with different exclusivity (was: "
+                                                         + queue.isExclusive()
+                                                         + " requested "
+                                                         + exclusive
+                                                         + ")");
+                }
+                else if ((autoDelete
+                          && queue.getLifetimePolicy() != LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS)
+                         || (!autoDelete && queue.getLifetimePolicy() != ((exclusive
+                                                                           && !durable)
+                        ? LifetimePolicy.DELETE_ON_CONNECTION_CLOSE
+                        : LifetimePolicy.PERMANENT)))
+                {
+                    closeChannel(AMQConstant.ALREADY_EXISTS,
+                                                         "Cannot re-declare queue '"
+                                                         + queue.getName()
+                                                         + "' with different lifetime policy (was: "
+                                                         + queue.getLifetimePolicy()
+                                                         + " requested autodelete: "
+                                                         + autoDelete
+                                                         + ")");
+                }
+                else if (queue.isDurable() != durable)
+                {
+                    closeChannel(AMQConstant.ALREADY_EXISTS,
+                                                         "Cannot re-declare queue '"
+                                                         + queue.getName()
+                                                         + "' with different durability (was: "
+                                                         + queue.isDurable()
+                                                         + " requested "
+                                                         + durable
+                                                         + ")");
+                }
+                else
+                {
+                    setDefaultQueue(queue);
+                    if (!nowait)
+                    {
+                        sync();
+                        MethodRegistry methodRegistry = _connection.getMethodRegistry();
+                        QueueDeclareOkBody responseBody =
+                                methodRegistry.createQueueDeclareOkBody(queueName,
+                                                                        queue.getQueueDepthMessages(),
+                                                                        queue.getConsumerCount());
+                        _connection.writeFrame(responseBody.generateFrame(getChannelId()));
+
+                        _logger.info("Queue " + queueName + " declared successfully");
+                    }
+                }
+            }
+            catch (AccessControlException e)
+            {
+                _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId());
+            }
+
+        }
+    }
+
+    @Override
+    public void receiveQueueDelete(final AMQShortString queueName,
+                                   final boolean ifUnused,
+                                   final boolean ifEmpty,
+                                   final boolean nowait)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] QueueDelete[" +" queue: " + queueName + " ifUnused: " + ifUnused + " ifEmpty: " + ifEmpty + " nowait: " + nowait + " ]");
+        }
+
+        VirtualHostImpl virtualHost = _connection.getVirtualHost();
+        sync();
+        AMQQueue queue;
+        if (queueName == null)
+        {
+
+            //get the default queue on the channel:
+            queue = getDefaultQueue();
+        }
+        else
+        {
+            queue = virtualHost.getQueue(queueName.toString());
+        }
+
+        if (queue == null)
+        {
+            closeChannel(AMQConstant.NOT_FOUND, "Queue " + queueName + " does not exist.");
+
+        }
+        else
+        {
+            if (ifEmpty && !queue.isEmpty())
+            {
+                closeChannel(AMQConstant.IN_USE, "Queue: " + queueName + " is not empty.");
+            }
+            else if (ifUnused && !queue.isUnused())
+            {
+                // TODO - Error code
+                closeChannel(AMQConstant.IN_USE, "Queue: " + queueName + " is still used.");
+            }
+            else
+            {
+                if (!queue.verifySessionAccess(this))
+                {
+                    _connection.closeConnection(AMQConstant.NOT_ALLOWED, "Queue "
+                                                + queue.getName()
+                                                + " is exclusive, but not created on this Connection.", getChannelId());
+
+                }
+                else
+                {
+                    try
+                    {
+                        int purged = virtualHost.removeQueue(queue);
+
+                        MethodRegistry methodRegistry = _connection.getMethodRegistry();
+                        QueueDeleteOkBody responseBody = methodRegistry.createQueueDeleteOkBody(purged);
+                        _connection.writeFrame(responseBody.generateFrame(getChannelId()));
+                    }
+                    catch (AccessControlException e)
+                    {
+                        _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId());
+
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public void receiveQueuePurge(final AMQShortString queueName, final boolean nowait)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] QueuePurge[" +" queue: " + queueName + " nowait: " + nowait + " ]");
+        }
+
+        VirtualHostImpl virtualHost = _connection.getVirtualHost();
+        AMQQueue queue = null;
+        if (queueName == null && (queue = getDefaultQueue()) == null)
+        {
+
+            _connection.closeConnection(AMQConstant.NOT_ALLOWED, "No queue specified.", getChannelId());
+        }
+        else if ((queueName != null) && (queue = virtualHost.getQueue(queueName.toString())) == null)
+        {
+            closeChannel(AMQConstant.NOT_FOUND, "Queue " + queueName + " does not exist.");
+        }
+        else if (!queue.verifySessionAccess(this))
+        {
+            _connection.closeConnection(AMQConstant.NOT_ALLOWED,
+                                        "Queue is exclusive, but not created on this Connection.", getChannelId());
+        }
+        else
+        {
+            try
+            {
+                long purged = queue.clearQueue();
+                if (!nowait)
+                {
+                    sync();
+                    MethodRegistry methodRegistry = _connection.getMethodRegistry();
+                    AMQMethodBody responseBody = methodRegistry.createQueuePurgeOkBody(purged);
+                    _connection.writeFrame(responseBody.generateFrame(getChannelId()));
+
+                }
+            }
+            catch (AccessControlException e)
+            {
+                _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId());
+
+            }
+
+        }
+    }
+
+    @Override
+    public void receiveQueueUnbind(final AMQShortString queueName,
+                                   final AMQShortString exchange,
+                                   final AMQShortString routingKey,
+                                   final FieldTable arguments)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] QueueUnbind[" +" queue: " + queueName +
+                          " exchange: " + exchange +
+                          " bindingKey: " + routingKey +
+                          " arguments: " + arguments + " ]");
+        }
+
+        VirtualHostImpl virtualHost = _connection.getVirtualHost();
+
+
+        final boolean useDefaultQueue = queueName == null;
+        final AMQQueue queue = useDefaultQueue
+                ? getDefaultQueue()
+                : virtualHost.getQueue(queueName.toString());
+
+
+        if (queue == null)
+        {
+            String message = useDefaultQueue
+                    ? "No default queue defined on channel and queue was null"
+                    : "Queue " + queueName + " does not exist.";
+            closeChannel(AMQConstant.NOT_FOUND, message);
+        }
+        else if (isDefaultExchange(exchange))
+        {
+            _connection.closeConnection(AMQConstant.NOT_ALLOWED, "Cannot unbind the queue "
+                                                         + queue.getName()
+                                                         + " from the default exchange", getChannelId());
+
+        }
+        else
+        {
+
+            final ExchangeImpl exch = virtualHost.getExchange(exchange.toString());
+
+            if (exch == null)
+            {
+                closeChannel(AMQConstant.NOT_FOUND, "Exchange " + exchange + " does not exist.");
+            }
+            else if (!exch.hasBinding(String.valueOf(routingKey), queue))
+            {
+                closeChannel(AMQConstant.NOT_FOUND, "No such binding");
+            }
+            else
+            {
+                try
+                {
+                    exch.deleteBinding(String.valueOf(routingKey), queue);
+
+                    final AMQMethodBody responseBody = _connection.getMethodRegistry().createQueueUnbindOkBody();
+                    sync();
+                    _connection.writeFrame(responseBody.generateFrame(getChannelId()));
+                }
+                catch (AccessControlException e)
+                {
+                    _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId());
+
+                }
+            }
+
+        }
+    }
+
+    @Override
+    public void receiveTxSelect()
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] TxSelect");
+        }
+
+        setLocalTransactional();
+
+        MethodRegistry methodRegistry = _connection.getMethodRegistry();
+        TxSelectOkBody responseBody = methodRegistry.createTxSelectOkBody();
+        _connection.writeFrame(responseBody.generateFrame(_channelId));
+
+    }
+
+    @Override
+    public void receiveTxCommit()
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] TxCommit");
+        }
+
+
+        if (!isTransactional())
+        {
+            closeChannel(AMQConstant.COMMAND_INVALID,
+                         "Fatal error: commit called on non-transactional channel");
+        }
+        commit(new Runnable()
+        {
+
+            @Override
+            public void run()
+            {
+                MethodRegistry methodRegistry = _connection.getMethodRegistry();
+                AMQMethodBody responseBody = methodRegistry.createTxCommitOkBody();
+                _connection.writeFrame(responseBody.generateFrame(_channelId));
+            }
+        }, true);
+
+    }
+
+    @Override
+    public void receiveTxRollback()
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] TxRollback");
+        }
+
+        if (!isTransactional())
+        {
+            closeChannel(AMQConstant.COMMAND_INVALID,
+                         "Fatal error: rollback called on non-transactional channel");
+        }
+
+        final MethodRegistry methodRegistry = _connection.getMethodRegistry();
+        final AMQMethodBody responseBody = methodRegistry.createTxRollbackOkBody();
+

[... 31 lines stripped ...]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org