You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/10/12 01:46:39 UTC

svn commit: r1631137 [3/4] - in /qpid/branches/QPID-6125-ProtocolRefactoring/java: broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/...

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java?rev=1631137&r1=1631136&r2=1631137&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java Sat Oct 11 23:46:39 2014
@@ -22,12 +22,6 @@ package org.apache.qpid.server.protocol.
 
 import java.security.AccessControlException;
 import java.security.PrivilegedAction;
-import java.util.Collection;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.UUID;
 
 import javax.security.auth.Subject;
 import javax.security.sasl.SaslException;
@@ -37,44 +31,20 @@ import org.apache.log4j.Logger;
 
 import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.consumer.ConsumerImpl;
-import org.apache.qpid.server.exchange.ExchangeImpl;
-import org.apache.qpid.server.filter.AMQInvalidArgumentException;
-import org.apache.qpid.server.flow.FlowCreditManager;
-import org.apache.qpid.server.flow.MessageOnlyCreditManager;
-import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.message.MessageDestination;
-import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.message.MessageSource;
-import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.ExclusivityPolicy;
-import org.apache.qpid.server.model.LifetimePolicy;
-import org.apache.qpid.server.model.NoFactoryForTypeException;
-import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.State;
-import org.apache.qpid.server.model.UnknownConfiguredObjectException;
 import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueArgumentsConverter;
 import org.apache.qpid.server.security.SubjectCreator;
 import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
-import org.apache.qpid.server.virtualhost.ExchangeExistsException;
-import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
-import org.apache.qpid.server.virtualhost.QueueExistsException;
-import org.apache.qpid.server.virtualhost.RequiredExchangeException;
-import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 
 public class ServerMethodDispatcherImpl implements MethodDispatcher
 {
     private static final Logger _logger = Logger.getLogger(ServerMethodDispatcherImpl.class);
 
-    private final AMQProtocolSession<?> _connection;
+    private final AMQProtocolEngine _connection;
 
 
     private static interface ChannelAction
@@ -82,19 +52,19 @@ public class ServerMethodDispatcherImpl 
         void onChannel(ChannelMethodProcessor channel);
     }
 
-    public static MethodDispatcher createMethodDispatcher(AMQProtocolSession<?> connection)
+    public static MethodDispatcher createMethodDispatcher(AMQProtocolEngine connection)
     {
         return new ServerMethodDispatcherImpl(connection);
     }
 
 
-    public ServerMethodDispatcherImpl(AMQProtocolSession<?> connection)
+    public ServerMethodDispatcherImpl(AMQProtocolEngine connection)
     {
         _connection = connection;
     }
 
 
-    protected final AMQProtocolSession<?> getConnection()
+    protected final AMQProtocolEngine getConnection()
     {
         return _connection;
     }
@@ -104,7 +74,7 @@ public class ServerMethodDispatcherImpl 
         final AMQChannel channel = _connection.getChannel(channelId);
         if (channel == null)
         {
-            // TODO throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+            closeConnection(AMQConstant.CHANNEL_ERROR, "Unknown channel id: " + channelId);
         }
         else
         {
@@ -121,595 +91,169 @@ public class ServerMethodDispatcherImpl 
 
     }
 
-    public boolean dispatchAccessRequest(AccessRequestBody body, int channelId) throws AMQException
+    public boolean dispatchAccessRequest(final AccessRequestBody body, int channelId)
     {
-        final AMQChannel channel = _connection.getChannel(channelId);
-        if (channel == null)
-        {
-            throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
-        }
-
-        MethodRegistry methodRegistry = _connection.getMethodRegistry();
-
-        if(ProtocolVersion.v0_91.equals(_connection.getProtocolVersion()) )
-        {
-            throw new AMQException(AMQConstant.COMMAND_INVALID, "AccessRequest not present in AMQP versions other than 0-8, 0-9");
-        }
-
-        // We don't implement access control class, but to keep clients happy that expect it
-        // always use the "0" ticket.
-        AccessRequestOkBody response = methodRegistry.createAccessRequestOkBody(0);
-        channel.sync();
-        _connection.writeFrame(response.generateFrame(channelId));
-        return true;
-    }
-
-    public boolean dispatchBasicAck(BasicAckBody body, int channelId) throws AMQException
-    {
-
-        if (_logger.isDebugEnabled())
-        {
-            _logger.debug("Ack(Tag:" + body.getDeliveryTag() + ":Mult:" + body.getMultiple() + ") received on channel " + channelId);
-        }
-
-        final AMQChannel channel = _connection.getChannel(channelId);
-
-        if (channel == null)
-        {
-            throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
-        }
+        processChannelMethod(channelId,
+                             new ChannelAction()
+                                {
+                                    @Override
+                                    public void onChannel(final ChannelMethodProcessor channel)
+                                    {
+                                        channel.receiveAccessRequest(body.getRealm(),
+                                                                     body.getExclusive(),
+                                                                     body.getPassive(),
+                                                                     body.getActive(),
+                                                                     body.getWrite(),
+                                                                     body.getRead());
+                                    }
+                                }
+                            );
 
-        // this method throws an AMQException if the delivery tag is not known
-        channel.acknowledgeMessage(body.getDeliveryTag(), body.getMultiple());
         return true;
     }
 
-    public boolean dispatchBasicCancel(BasicCancelBody body, int channelId) throws AMQException
+    public boolean dispatchBasicAck(final BasicAckBody body, int channelId)
     {
-        final AMQChannel channel = _connection.getChannel(channelId);
-
-
-        if (channel == null)
-        {
-            throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
-        }
-
-        if (_logger.isDebugEnabled())
-        {
-            _logger.debug("BasicCancel: for:" + body.getConsumerTag() +
-                       " nowait:" + body.getNowait());
-        }
+        processChannelMethod(channelId,
+                             new ChannelAction()
+                             {
+                                 @Override
+                                 public void onChannel(final ChannelMethodProcessor channel)
+                                 {
+                                    channel.receiveBasicAck(body.getDeliveryTag(), body.getMultiple());
+                                 }
+                             }
+                            );
 
-        channel.unsubscribeConsumer(body.getConsumerTag());
-        if (!body.getNowait())
-        {
-            MethodRegistry methodRegistry = _connection.getMethodRegistry();
-            BasicCancelOkBody cancelOkBody = methodRegistry.createBasicCancelOkBody(body.getConsumerTag());
-            channel.sync();
-            _connection.writeFrame(cancelOkBody.generateFrame(channelId));
-        }
         return true;
     }
 
-    public boolean dispatchBasicConsume(BasicConsumeBody body, int channelId) throws AMQException
+    public boolean dispatchBasicCancel(final BasicCancelBody body, int channelId)
     {
-        AMQChannel channel = _connection.getChannel(channelId);
-        VirtualHostImpl<?,?,?> vHost = _connection.getVirtualHost();
-
-        if (channel == null)
-        {
-            throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
-        }
-        else
-        {
-            channel.sync();
-            String queueName = body.getQueue() == null ? null : body.getQueue().asString();
-            if (_logger.isDebugEnabled())
-            {
-                _logger.debug("BasicConsume: from '" + queueName +
-                              "' for:" + body.getConsumerTag() +
-                              " nowait:" + body.getNowait() +
-                              " args:" + body.getArguments());
-            }
-
-            MessageSource queue = queueName == null ? channel.getDefaultQueue() : vHost.getQueue(queueName);
-            final Collection<MessageSource> sources = new HashSet<>();
-            if(queue != null)
-            {
-                sources.add(queue);
-            }
-            else if(vHost.getContextValue(Boolean.class, "qpid.enableMultiQueueConsumers")
-                    && body.getArguments() != null
-                    && body.getArguments().get("x-multiqueue") instanceof Collection)
-            {
-                for(Object object : (Collection<Object>) body.getArguments().get("x-multiqueue"))
-                {
-                    String sourceName = String.valueOf(object);
-                    sourceName = sourceName.trim();
-                    if(sourceName.length() != 0)
-                    {
-                        MessageSource source = vHost.getMessageSource(sourceName);
-                        if(source == null)
-                        {
-                            sources.clear();
-                            break;
-                        }
-                        else
-                        {
-                            sources.add(source);
-                        }
-                    }
-                }
-                queueName = body.getArguments().get("x-multiqueue").toString();
-            }
-
-            if (sources.isEmpty())
-            {
-                if (_logger.isDebugEnabled())
-                {
-                    _logger.debug("No queue for '" + queueName + "'");
-                }
-                if (queueName != null)
-                {
-                    String msg = "No such queue, '" + queueName + "'";
-                    throw body.getChannelException(AMQConstant.NOT_FOUND, msg, _connection.getMethodRegistry());
-                }
-                else
-                {
-                    String msg = "No queue name provided, no default queue defined.";
-                    throw body.getConnectionException(AMQConstant.NOT_ALLOWED, msg, _connection.getMethodRegistry());
-                }
-            }
-            else
-            {
-                final AMQShortString consumerTagName;
-
-                if (body.getConsumerTag() != null)
-                {
-                    consumerTagName = body.getConsumerTag().intern(false);
-                }
-                else
-                {
-                    consumerTagName = null;
-                }
-
-                try
-                {
-                    if(consumerTagName == null || channel.getSubscription(consumerTagName) == null)
-                    {
-
-                        AMQShortString consumerTag = channel.consumeFromSource(consumerTagName,
-                                                                               sources,
-                                                                               !body.getNoAck(),
-                                                                               body.getArguments(),
-                                                                               body.getExclusive(),
-                                                                               body.getNoLocal());
-                        if (!body.getNowait())
-                        {
-                            MethodRegistry methodRegistry = _connection.getMethodRegistry();
-                            AMQMethodBody responseBody = methodRegistry.createBasicConsumeOkBody(consumerTag);
-                            _connection.writeFrame(responseBody.generateFrame(channelId));
-
-                        }
-                    }
-                    else
-                    {
-                        AMQShortString msg = AMQShortString.validValueOf("Non-unique consumer tag, '" + body.getConsumerTag() + "'");
-
-                        MethodRegistry methodRegistry = _connection.getMethodRegistry();
-                        AMQMethodBody responseBody = methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(),    // replyCode
-                                                                 msg,               // replytext
-                                                                 body.getClazz(),
-                                                                 body.getMethod());
-                        _connection.writeFrame(responseBody.generateFrame(0));
-                    }
-
-                }
-                catch (AMQInvalidArgumentException ise)
-                {
-                    _logger.debug("Closing connection due to invalid selector");
-
-                    MethodRegistry methodRegistry = _connection.getMethodRegistry();
-                    AMQMethodBody responseBody = methodRegistry.createChannelCloseBody(AMQConstant.ARGUMENT_INVALID.getCode(),
-                                                                                       AMQShortString.validValueOf(ise.getMessage()),
-                                                                                       body.getClazz(),
-                                                                                       body.getMethod());
-                    _connection.writeFrame(responseBody.generateFrame(channelId));
-
-
-                }
-                catch (AMQQueue.ExistingExclusiveConsumer e)
-                {
-                    throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
-                                                      "Cannot subscribe to queue "
-                                                      + queue.getName()
-                                                      + " as it already has an existing exclusive consumer",
-                                                      _connection.getMethodRegistry());
-                }
-                catch (AMQQueue.ExistingConsumerPreventsExclusive e)
-                {
-                    throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
-                                                      "Cannot subscribe to queue "
-                                                      + queue.getName()
-                                                      + " exclusively as it already has a consumer",
-                                                      _connection.getMethodRegistry());
-                }
-                catch (AccessControlException e)
-                {
-                    throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
-                                                      "Cannot subscribe to queue "
-                                                      + queue.getName()
-                                                      + " permission denied", _connection.getMethodRegistry());
-                }
-                catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
-                {
-                    throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
-                                                      "Cannot subscribe to queue "
-                                                      + queue.getName()
-                                                      + " as it already has an incompatible exclusivity policy",
-                                                      _connection.getMethodRegistry());
-                }
-
-            }
-        }
+        processChannelMethod(channelId,
+                             new ChannelAction()
+                             {
+                                 @Override
+                                 public void onChannel(final ChannelMethodProcessor channel)
+                                 {
+                                    channel.receiveBasicCancel(body.getConsumerTag(),
+                                                               body.getNowait()
+                                                              );
+                                 }
+                             }
+                            );
         return true;
     }
 
-    public boolean dispatchBasicGet(BasicGetBody body, int channelId) throws AMQException
+    public boolean dispatchBasicConsume(final BasicConsumeBody body, int channelId)
     {
 
-        VirtualHostImpl vHost = _connection.getVirtualHost();
-
-        AMQChannel channel = _connection.getChannel(channelId);
-        if (channel == null)
-        {
-            throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
-        }
-        else
-        {
-            channel.sync();
-            AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().toString());
-            if (queue == null)
-            {
-                _logger.info("No queue for '" + body.getQueue() + "'");
-                if(body.getQueue()!=null)
-                {
-                    throw body.getConnectionException(AMQConstant.NOT_FOUND,
-                                                      "No such queue, '" + body.getQueue() + "'",
-                                                      _connection.getMethodRegistry());
-                }
-                else
-                {
-                    throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
-                                                      "No queue name provided, no default queue defined.",
-                                                      _connection.getMethodRegistry());
-                }
-            }
-            else
-            {
-
-                try
-                {
-                    if (!performGet(queue, _connection, channel, !body.getNoAck()))
-                    {
-                        MethodRegistry methodRegistry = _connection.getMethodRegistry();
-
-                        BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null);
+        processChannelMethod(channelId,
+                             new ChannelAction()
+                             {
+                                 @Override
+                                 public void onChannel(final ChannelMethodProcessor channel)
+                                 {
+                                     channel.receiveBasicConsume(body.getQueue(), body.getConsumerTag(),
+                                                                 body.getNoLocal(), body.getNoAck(),
+                                                                 body.getExclusive(), body.getNowait(),
+                                                                 body.getArguments());
+                                 }
+                             }
+                            );
 
 
-                        _connection.writeFrame(responseBody.generateFrame(channelId));
-                    }
-                }
-                catch (AccessControlException e)
-                {
-                    throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
-                                                      e.getMessage(), _connection.getMethodRegistry());
-                }
-                catch (MessageSource.ExistingExclusiveConsumer e)
-                {
-                    throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
-                                                      "Queue has an exclusive consumer",
-                                                      _connection.getMethodRegistry());
-                }
-                catch (MessageSource.ExistingConsumerPreventsExclusive e)
-                {
-                    throw body.getConnectionException(AMQConstant.INTERNAL_ERROR,
-                                                      "The GET request has been evaluated as an exclusive consumer, " +
-                                                      "this is likely due to a programming error in the Qpid broker",
-                                                      _connection.getMethodRegistry());
-                }
-                catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
-                {
-                    throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
-                                                      "Queue has an incompatible exclusivit policy",
-                                                      _connection.getMethodRegistry());
-                }
-            }
-        }
         return true;
     }
 
-    public static boolean performGet(final AMQQueue queue,
-                                     final AMQProtocolSession session,
-                                     final AMQChannel channel,
-                                     final boolean acks)
-            throws AMQException, MessageSource.ExistingConsumerPreventsExclusive,
-                   MessageSource.ExistingExclusiveConsumer, MessageSource.ConsumerAccessRefused
+    private void closeConnection(final AMQConstant constant,
+                                 final String message)
     {
-
-        final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L);
-
-        final GetDeliveryMethod getDeliveryMethod =
-                new GetDeliveryMethod(singleMessageCredit, session, channel, queue);
-        final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod()
-        {
-
-            public void recordMessageDelivery(final ConsumerImpl sub, final MessageInstance entry, final long deliveryTag)
-            {
-                channel.addUnacknowledgedMessage(entry, deliveryTag, null);
-            }
-        };
-
-        ConsumerTarget_0_8 target;
-        EnumSet<ConsumerImpl.Option> options = EnumSet.of(ConsumerImpl.Option.TRANSIENT, ConsumerImpl.Option.ACQUIRES,
-                                                          ConsumerImpl.Option.SEES_REQUEUES);
-        if(acks)
-        {
-
-            target = ConsumerTarget_0_8.createAckTarget(channel,
-                                                        AMQShortString.EMPTY_STRING, null,
-                                                        singleMessageCredit, getDeliveryMethod, getRecordMethod);
-        }
-        else
-        {
-            target = ConsumerTarget_0_8.createGetNoAckTarget(channel,
-                                                             AMQShortString.EMPTY_STRING, null,
-                                                             singleMessageCredit, getDeliveryMethod, getRecordMethod);
-        }
-
-        ConsumerImpl sub = queue.addConsumer(target, null, AMQMessage.class, "", options);
-        sub.flush();
-        sub.close();
-        return(getDeliveryMethod.hasDeliveredMessage());
-
-
+        _connection.closeConnection(constant, message, 0);
     }
 
-
-    private static class GetDeliveryMethod implements ClientDeliveryMethod
+    public boolean dispatchBasicGet(final BasicGetBody body, int channelId)
     {
 
-        private final FlowCreditManager _singleMessageCredit;
-        private final AMQProtocolSession _session;
-        private final AMQChannel _channel;
-        private final AMQQueue _queue;
-        private boolean _deliveredMessage;
-
-        public GetDeliveryMethod(final FlowCreditManager singleMessageCredit,
-                                 final AMQProtocolSession session,
-                                 final AMQChannel channel, final AMQQueue queue)
-        {
-            _singleMessageCredit = singleMessageCredit;
-            _session = session;
-            _channel = channel;
-            _queue = queue;
-        }
-
-        @Override
-        public long deliverToClient(final ConsumerImpl sub, final ServerMessage message,
-                                    final InstanceProperties props, final long deliveryTag)
-        {
-            _singleMessageCredit.useCreditForMessage(message.getSize());
-            long size =_session.getProtocolOutputConverter().writeGetOk(message,
-                                                                        props,
-                                                                        _channel.getChannelId(),
-                                                                        deliveryTag,
-                                                                        _queue.getQueueDepthMessages());
-
-            _deliveredMessage = true;
-            return size;
-        }
-
-        public boolean hasDeliveredMessage()
-        {
-            return _deliveredMessage;
-        }
+        processChannelMethod(channelId,
+                             new ChannelAction()
+                             {
+                                 @Override
+                                 public void onChannel(final ChannelMethodProcessor channel)
+                                 {
+                                     channel.receiveBasicGet(body.getQueue(), body.getNoAck());
+                                 }
+                             }
+                            );
+        return true;
     }
 
-    public boolean dispatchBasicPublish(BasicPublishBody body, int channelId) throws AMQException
+    public boolean dispatchBasicPublish(final BasicPublishBody body, int channelId)
     {
-        if (_logger.isDebugEnabled())
-        {
-            _logger.debug("Publish received on channel " + channelId);
-        }
-
-        AMQShortString exchangeName = body.getExchange();
-        VirtualHostImpl vHost = _connection.getVirtualHost();
-
-        // TODO: check the delivery tag field details - is it unique across the broker or per subscriber?
-
-        MessageDestination destination;
-
-        if (exchangeName == null || AMQShortString.EMPTY_STRING.equals(exchangeName))
-        {
-            destination = vHost.getDefaultDestination();
-        }
-        else
-        {
-            destination = vHost.getMessageDestination(exchangeName.toString());
-        }
-
-        // if the exchange does not exist we raise a channel exception
-        if (destination == null)
-        {
-            throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange name",
-                                           _connection.getMethodRegistry());
-        }
-        else
-        {
-            // The partially populated BasicDeliver frame plus the received route body
-            // is stored in the channel. Once the final body frame has been received
-            // it is routed to the exchange.
-            AMQChannel channel = _connection.getChannel(channelId);
-
-            if (channel == null)
-            {
-                throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
-            }
+        processChannelMethod(channelId,
+                             new ChannelAction()
+                             {
+                                 @Override
+                                 public void onChannel(final ChannelMethodProcessor channel)
+                                 {
+                                     channel.receiveBasicPublish(body.getExchange(), body.getRoutingKey(),
+                                                                 body.getMandatory(), body.getImmediate());
+                                 }
+                             }
+                            );
 
-            MessagePublishInfo info = new MessagePublishInfo(body.getExchange(),
-                                                             body.getImmediate(),
-                                                             body.getMandatory(),
-                                                             body.getRoutingKey());
-            info.setExchange(exchangeName);
-            try
-            {
-                channel.setPublishFrame(info, destination);
-            }
-            catch (AccessControlException e)
-            {
-                throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
-                                                  e.getMessage(),
-                                                  _connection.getMethodRegistry());
-            }
-        }
         return true;
     }
 
-    public boolean dispatchBasicQos(BasicQosBody body, int channelId) throws AMQException
+    public boolean dispatchBasicQos(final BasicQosBody body, int channelId)
     {
-        AMQChannel channel = _connection.getChannel(channelId);
-        if (channel == null)
-        {
-            throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
-        }
-        channel.sync();
-        channel.setCredit(body.getPrefetchSize(), body.getPrefetchCount());
-
-
-        MethodRegistry methodRegistry = _connection.getMethodRegistry();
-        AMQMethodBody responseBody = methodRegistry.createBasicQosOkBody();
-        _connection.writeFrame(responseBody.generateFrame(channelId));
+        processChannelMethod(channelId,
+                             new ChannelAction()
+                             {
+                                 @Override
+                                 public void onChannel(final ChannelMethodProcessor channel)
+                                 {
+                                     channel.receiveBasicQos(body.getPrefetchSize(), body.getPrefetchCount(),
+                                                             body.getGlobal());
+                                 }
+                             }
+                            );
 
         return true;
     }
 
-    public boolean dispatchBasicRecover(BasicRecoverBody body, int channelId) throws AMQException
+    public boolean dispatchBasicRecover(final BasicRecoverBody body, int channelId)
     {
-        _logger.debug("Recover received on protocol session " + _connection
-                                                + " and channel " + channelId);
-        AMQChannel channel = _connection.getChannel(channelId);
-
-
-        if (channel == null)
-        {
-            throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
-        }
-
-        channel.resend();
+        final boolean sync = _connection.getProtocolVersion().equals(ProtocolVersion.v8_0);
 
-        // Qpid 0-8 hacks a synchronous -ok onto recover.
-        // In Qpid 0-9 we create a separate sync-recover, sync-recover-ok pair to be "more" compliant
-        if(_connection.getProtocolVersion().equals(ProtocolVersion.v8_0))
-        {
-            MethodRegistry methodRegistry = _connection.getMethodRegistry();
-            AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody();
-            channel.sync();
-            _connection.writeFrame(recoverOk.generateFrame(channelId));
-
-        }
+        processChannelMethod(channelId,
+                             new ChannelAction()
+                             {
+                                 @Override
+                                 public void onChannel(final ChannelMethodProcessor channel)
+                                 {
+                                     channel.receiveBasicRecover(body.getRequeue(), sync);
+                                 }
+                             }
+                            );
 
         return true;
     }
 
-    public boolean dispatchBasicReject(BasicRejectBody body, int channelId) throws AMQException
+    public boolean dispatchBasicReject(final BasicRejectBody body, int channelId) throws AMQException
     {
 
-        AMQChannel channel = _connection.getChannel(channelId);
-
-        if (channel == null)
-        {
-            throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
-        }
-
-        if (_logger.isDebugEnabled())
-        {
-            _logger.debug("Rejecting:" + body.getDeliveryTag() +
-                          ": Requeue:" + body.getRequeue() +
-                          " on channel:" + channel.debugIdentity());
-        }
-
-        long deliveryTag = body.getDeliveryTag();
-
-        MessageInstance message = channel.getUnacknowledgedMessageMap().get(deliveryTag);
-
-        if (message == null)
-        {
-            _logger.warn("Dropping reject request as message is null for tag:" + deliveryTag);
-        }
-        else
-        {
-
-            if (message.getMessage() == null)
-            {
-                _logger.warn("Message has already been purged, unable to Reject.");
-            }
-            else
-            {
-
-                if (_logger.isDebugEnabled())
-                {
-                    _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage() +
-                                  ": Requeue:" + body.getRequeue() +
-                                  " on channel:" + channel.debugIdentity());
-                }
-
-                if (body.getRequeue())
-                {
-                    //this requeue represents a message rejected from the pre-dispatch queue
-                    //therefore we need to amend the delivery counter.
-                    message.decrementDeliveryCount();
+        processChannelMethod(channelId,
+                             new ChannelAction()
+                             {
+                                 @Override
+                                 public void onChannel(final ChannelMethodProcessor channel)
+                                 {
+                                     channel.receiveBasicReject(body.getDeliveryTag(), body.getRequeue());
+                                 }
+                             }
+                            );
 
-                    channel.requeue(deliveryTag);
-                }
-                else
-                {
-                    // Since the Java client abuses the reject flag for requeing after rollback, we won't set reject here
-                    // as it would prevent redelivery
-                    // message.reject();
-
-                    final boolean maxDeliveryCountEnabled = channel.isMaxDeliveryCountEnabled(deliveryTag);
-                    _logger.debug("maxDeliveryCountEnabled: "
-                                  + maxDeliveryCountEnabled
-                                  + " deliveryTag "
-                                  + deliveryTag);
-                    if (maxDeliveryCountEnabled)
-                    {
-                        final boolean deliveredTooManyTimes = channel.isDeliveredTooManyTimes(deliveryTag);
-                        _logger.debug("deliveredTooManyTimes: "
-                                      + deliveredTooManyTimes
-                                      + " deliveryTag "
-                                      + deliveryTag);
-                        if (deliveredTooManyTimes)
-                        {
-                            channel.deadLetter(body.getDeliveryTag());
-                        }
-                        else
-                        {
-                            //this requeue represents a message rejected because of a recover/rollback that we
-                            //are not ready to DLQ. We rely on the reject command to resend from the unacked map
-                            //and therefore need to increment the delivery counter so we cancel out the effect
-                            //of the AMQChannel#resend() decrement.
-                            message.incrementDeliveryCount();
-                        }
-                    }
-                    else
-                    {
-                        channel.requeue(deliveryTag);
-                    }
-                }
-            }
-        }
         return true;
     }
 
@@ -720,7 +264,9 @@ public class ServerMethodDispatcherImpl 
         // Protect the broker against out of order frame request.
         if (virtualHost == null)
         {
-            throw new AMQException(AMQConstant.COMMAND_INVALID, "Virtualhost has not yet been set. ConnectionOpen has not been called.", null);
+            throw new AMQException(AMQConstant.COMMAND_INVALID,
+                                   "Virtualhost has not yet been set. ConnectionOpen has not been called.",
+                                   null);
         }
         _logger.info("Connecting to: " + virtualHost.getName());
 
@@ -783,30 +329,17 @@ public class ServerMethodDispatcherImpl 
     public boolean dispatchChannelClose(ChannelCloseBody body, int channelId) throws AMQException
     {
 
-        if (_logger.isInfoEnabled())
-        {
-            _logger.info("Received channel close for id " + channelId
-                                             + " citing class " + body.getClassId() +
-                         " and method " + body.getMethodId());
-        }
-
+        processChannelMethod(channelId,
+                             new ChannelAction()
+                             {
+                                 @Override
+                                 public void onChannel(final ChannelMethodProcessor channel)
+                                 {
+                                     channel.receiveChannelClose();
+                                 }
+                             }
+                            );
 
-        AMQChannel channel = _connection.getChannel(channelId);
-
-        if (channel == null)
-        {
-            throw body.getConnectionException(AMQConstant.CHANNEL_ERROR,
-                                              "Trying to close unknown channel",
-                                              _connection.getMethodRegistry());
-        }
-        channel.sync();
-        _connection.closeChannel(channelId);
-        // Client requested closure so we don't wait for ok we send it
-        _connection.closeChannelOk(channelId);
-
-        MethodRegistry methodRegistry = _connection.getMethodRegistry();
-        ChannelCloseOkBody responseBody = methodRegistry.createChannelCloseOkBody();
-        _connection.writeFrame(responseBody.generateFrame(channelId));
         return true;
     }
 
@@ -814,32 +347,34 @@ public class ServerMethodDispatcherImpl 
     public boolean dispatchChannelCloseOk(ChannelCloseOkBody body, int channelId) throws AMQException
     {
 
-        _logger.info("Received channel-close-ok for channel-id " + channelId);
+        processChannelMethod(channelId,
+                             new ChannelAction()
+                             {
+                                 @Override
+                                 public void onChannel(final ChannelMethodProcessor channel)
+                                 {
+                                     channel.receiveChannelCloseOk();
+                                 }
+                             }
+                            );
 
-        // Let the Protocol Session know the channel is now closed.
-        _connection.closeChannelOk(channelId);
         return true;
     }
 
 
-    public boolean dispatchChannelFlow(ChannelFlowBody body, int channelId) throws AMQException
-    {
-        final AMQProtocolSession<?> connection = getConnection();
-
-
-        AMQChannel channel = connection.getChannel(channelId);
-
-        if (channel == null)
-        {
-            throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
-        }
-        channel.sync();
-        channel.setSuspended(!body.getActive());
-        _logger.debug("Channel.Flow for channel " + channelId + ", active=" + body.getActive());
-
-        MethodRegistry methodRegistry = connection.getMethodRegistry();
-        AMQMethodBody responseBody = methodRegistry.createChannelFlowOkBody(body.getActive());
-        connection.writeFrame(responseBody.generateFrame(channelId));
+    public boolean dispatchChannelFlow(final ChannelFlowBody body, int channelId) throws AMQException
+    {
+
+        processChannelMethod(channelId,
+                             new ChannelAction()
+                             {
+                                 @Override
+                                 public void onChannel(final ChannelMethodProcessor channel)
+                                 {
+                                     channel.receiveChannelFlow(body.getActive());
+                                 }
+                             }
+                            );
         return true;
     }
 
@@ -861,7 +396,8 @@ public class ServerMethodDispatcherImpl 
         String virtualHostName;
         if ((body.getVirtualHost() != null) && body.getVirtualHost().charAt(0) == '/')
         {
-            virtualHostName = new StringBuilder(body.getVirtualHost().subSequence(1, body.getVirtualHost().length())).toString();
+            virtualHostName =
+                    new StringBuilder(body.getVirtualHost().subSequence(1, body.getVirtualHost().length())).toString();
         }
         else
         {
@@ -872,41 +408,41 @@ public class ServerMethodDispatcherImpl 
 
         if (virtualHost == null)
         {
-            throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown virtual host: '" + virtualHostName + "'",
-                                              _connection.getMethodRegistry());
+            closeConnection(AMQConstant.NOT_FOUND,
+                            "Unknown virtual host: '" + virtualHostName + "'");
+
         }
         else
         {
             // Check virtualhost access
             if (virtualHost.getState() != State.ACTIVE)
             {
-                throw body.getConnectionException(AMQConstant.CONNECTION_FORCED,
-                                                  "Virtual host '" + virtualHost.getName() + "' is not active",
-                                                  _connection.getMethodRegistry());
-            }
+                closeConnection(AMQConstant.CONNECTION_FORCED,
+                                "Virtual host '" + virtualHost.getName() + "' is not active"
+                               );
 
-            _connection.setVirtualHost(virtualHost);
-            try
-            {
-                virtualHost.getSecurityManager().authoriseCreateConnection(_connection);
-            }
-            catch (AccessControlException e)
-            {
-                throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
-                                                  e.getMessage(),
-                                                  _connection.getMethodRegistry());
             }
-
-            // See Spec (0.8.2). Section  3.1.2 Virtual Hosts
-            if (_connection.getContextKey() == null)
+            else
             {
-                _connection.setContextKey(new AMQShortString(Long.toString(System.currentTimeMillis())));
-            }
+                _connection.setVirtualHost(virtualHost);
+                try
+                {
+                    virtualHost.getSecurityManager().authoriseCreateConnection(_connection);
+                    if (_connection.getContextKey() == null)
+                    {
+                        _connection.setContextKey(new AMQShortString(Long.toString(System.currentTimeMillis())));
+                    }
 
-            MethodRegistry methodRegistry = _connection.getMethodRegistry();
-            AMQMethodBody responseBody =  methodRegistry.createConnectionOpenOkBody(body.getVirtualHost());
+                    MethodRegistry methodRegistry = _connection.getMethodRegistry();
+                    AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(body.getVirtualHost());
 
-            _connection.writeFrame(responseBody.generateFrame(channelId));
+                    _connection.writeFrame(responseBody.generateFrame(channelId));
+                }
+                catch (AccessControlException e)
+                {
+                    closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage());
+                }
+            }
         }
         return true;
     }
@@ -1067,7 +603,7 @@ public class ServerMethodDispatcherImpl 
 
                 int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
 
-                if(frameMax <= 0)
+                if (frameMax <= 0)
                 {
                     frameMax = Integer.MAX_VALUE;
                 }
@@ -1089,12 +625,12 @@ public class ServerMethodDispatcherImpl 
         return true;
     }
 
-    private void disposeSaslServer(AMQProtocolSession ps)
+    private void disposeSaslServer(AMQProtocolEngine connection)
     {
-        SaslServer ss = ps.getSaslServer();
+        SaslServer ss = connection.getSaslServer();
         if (ss != null)
         {
-            ps.setSaslServer(null);
+            connection.setSaslServer(null);
             try
             {
                 ss.dispose();
@@ -1123,60 +659,66 @@ public class ServerMethodDispatcherImpl 
 
             if (ss == null)
             {
-                throw body.getConnectionException(AMQConstant.RESOURCE_ERROR,
-                                                  "Unable to create SASL Server:" + body.getMechanism(),
-                                                  _connection.getMethodRegistry());
+                closeConnection(AMQConstant.RESOURCE_ERROR,
+                                "Unable to create SASL Server:" + body.getMechanism()
+                               );
+
             }
+            else
+            {
 
-            _connection.setSaslServer(ss);
+                _connection.setSaslServer(ss);
 
-            final SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, body.getResponse());
-            //save clientProperties
-            _connection.setClientProperties(body.getClientProperties());
+                final SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, body.getResponse());
+                //save clientProperties
+                _connection.setClientProperties(body.getClientProperties());
 
-            MethodRegistry methodRegistry = _connection.getMethodRegistry();
+                MethodRegistry methodRegistry = _connection.getMethodRegistry();
 
-            switch (authResult.getStatus())
-            {
-                case ERROR:
-                    Exception cause = authResult.getCause();
+                switch (authResult.getStatus())
+                {
+                    case ERROR:
+                        Exception cause = authResult.getCause();
 
-                    _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage()));
+                        _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage()));
 
-                    ConnectionCloseBody closeBody =
-                            methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(),    // replyCode
-                                                                     AMQConstant.NOT_ALLOWED.getName(),
-                                                                     body.getClazz(),
-                                                                     body.getMethod());
-
-                    _connection.writeFrame(closeBody.generateFrame(0));
-                    disposeSaslServer(_connection);
-                    break;
+                        ConnectionCloseBody closeBody =
+                                methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(),
+                                                                         // replyCode
+                                                                         AMQConstant.NOT_ALLOWED.getName(),
+                                                                         body.getClazz(),
+                                                                         body.getMethod());
+
+                        _connection.writeFrame(closeBody.generateFrame(0));
+                        disposeSaslServer(_connection);
+                        break;
 
-                case SUCCESS:
-                    if (_logger.isInfoEnabled())
-                    {
-                        _logger.info("Connected as: " + authResult.getSubject());
-                    }
-                    _connection.setAuthorizedSubject(authResult.getSubject());
+                    case SUCCESS:
+                        if (_logger.isInfoEnabled())
+                        {
+                            _logger.info("Connected as: " + authResult.getSubject());
+                        }
+                        _connection.setAuthorizedSubject(authResult.getSubject());
 
-                    int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
+                        int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
 
-                    if(frameMax <= 0)
-                    {
-                        frameMax = Integer.MAX_VALUE;
-                    }
+                        if (frameMax <= 0)
+                        {
+                            frameMax = Integer.MAX_VALUE;
+                        }
 
-                    ConnectionTuneBody
-                            tuneBody = methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(),
-                                                                               frameMax,
-                                                                               broker.getConnection_heartBeatDelay());
-                    _connection.writeFrame(tuneBody.generateFrame(0));
-                    break;
-                case CONTINUE:
-                    ConnectionSecureBody
-                            secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge());
-                    _connection.writeFrame(secureBody.generateFrame(0));
+                        ConnectionTuneBody
+                                tuneBody =
+                                methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(),
+                                                                        frameMax,
+                                                                        broker.getConnection_heartBeatDelay());
+                        _connection.writeFrame(tuneBody.generateFrame(0));
+                        break;
+                    case CONTINUE:
+                        ConnectionSecureBody
+                                secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge());
+                        _connection.writeFrame(secureBody.generateFrame(0));
+                }
             }
         }
         catch (SaslException e)
@@ -1189,38 +731,33 @@ public class ServerMethodDispatcherImpl 
 
     public boolean dispatchConnectionTuneOk(ConnectionTuneOkBody body, int channelId) throws AMQException
     {
-        final AMQProtocolSession<?> connection = getConnection();
-
-        if (_logger.isDebugEnabled())
-        {
-            _logger.debug(body);
-        }
+        final AMQProtocolEngine connection = getConnection();
 
         connection.initHeartbeats(body.getHeartbeat());
 
         int brokerFrameMax = connection.getBroker().getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
-        if(brokerFrameMax <= 0)
+        if (brokerFrameMax <= 0)
         {
             brokerFrameMax = Integer.MAX_VALUE;
         }
 
-        if(body.getFrameMax() > (long) brokerFrameMax)
+        if (body.getFrameMax() > (long) brokerFrameMax)
         {
             throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR,
                                              "Attempt to set max frame size to " + body.getFrameMax()
                                              + " greater than the broker will allow: "
                                              + brokerFrameMax,
                                              body.getClazz(), body.getMethod(),
-                                             connection.getMethodRegistry(),null);
+                                             connection.getMethodRegistry(), null);
         }
-        else if(body.getFrameMax() > 0 && body.getFrameMax() < AMQConstant.FRAME_MIN_SIZE.getCode())
+        else if (body.getFrameMax() > 0 && body.getFrameMax() < AMQConstant.FRAME_MIN_SIZE.getCode())
         {
             throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR,
                                              "Attempt to set max frame size to " + body.getFrameMax()
                                              + " which is smaller than the specification definined minimum: "
                                              + AMQConstant.FRAME_MIN_SIZE.getCode(),
                                              body.getClazz(), body.getMethod(),
-                                             connection.getMethodRegistry(),null);
+                                             connection.getMethodRegistry(), null);
         }
         int frameMax = body.getFrameMax() == 0 ? brokerFrameMax : (int) body.getFrameMax();
         connection.setMaxFrameSize(frameMax);
@@ -1231,1035 +768,246 @@ public class ServerMethodDispatcherImpl 
         return true;
     }
 
-    public static final int OK = 0;
-    public static final int EXCHANGE_NOT_FOUND = 1;
-    public static final int QUEUE_NOT_FOUND = 2;
-    public static final int NO_BINDINGS = 3;
-    public static final int QUEUE_NOT_BOUND = 4;
-    public static final int NO_QUEUE_BOUND_WITH_RK = 5;
-    public static final int SPECIFIC_QUEUE_NOT_BOUND_WITH_RK = 6;
-
-    public boolean dispatchExchangeBound(ExchangeBoundBody body, int channelId) throws AMQException
+    public boolean dispatchExchangeBound(final ExchangeBoundBody body, int channelId)
     {
-        VirtualHostImpl virtualHost = _connection.getVirtualHost();
-        MethodRegistry methodRegistry = _connection.getMethodRegistry();
-
-        final AMQChannel channel = _connection.getChannel(channelId);
-        if (channel == null)
-        {
-            throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
-        }
-        channel.sync();
-
-
-        AMQShortString exchangeName = body.getExchange();
-        AMQShortString queueName = body.getQueue();
-        AMQShortString routingKey = body.getRoutingKey();
-        ExchangeBoundOkBody response;
-
-        if(isDefaultExchange(exchangeName))
-        {
-            if(routingKey == null)
-            {
-                if(queueName == null)
-                {
-                    response = methodRegistry.createExchangeBoundOkBody(virtualHost.getQueues().isEmpty() ? NO_BINDINGS : OK, null);
-                }
-                else
-                {
-                    AMQQueue queue = virtualHost.getQueue(queueName.toString());
-                    if (queue == null)
-                    {
-
-                        response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND,	// replyCode
-                                                                            AMQShortString.validValueOf("Queue '" + queueName + "' not found"));	// replyText
-                    }
-                    else
-                    {
-                        response = methodRegistry.createExchangeBoundOkBody(OK, null);
-                    }
-                }
-            }
-            else
-            {
-                if(queueName == null)
-                {
-                    response = methodRegistry.createExchangeBoundOkBody(virtualHost.getQueue(routingKey.toString()) == null ? NO_QUEUE_BOUND_WITH_RK : OK, null);
-                }
-                else
-                {
-                    AMQQueue queue = virtualHost.getQueue(queueName.toString());
-                    if (queue == null)
-                    {
-
-                        response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND,	// replyCode
-                                                                            AMQShortString.validValueOf("Queue '" + queueName + "' not found"));	// replyText
-                    }
-                    else
-                    {
-                        response = methodRegistry.createExchangeBoundOkBody(queueName.equals(routingKey) ? OK : SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, null);
-                    }
-                }
-            }
-        }
-        else
-        {
-            ExchangeImpl exchange = virtualHost.getExchange(exchangeName.toString());
-            if (exchange == null)
-            {
-
-
-                response = methodRegistry.createExchangeBoundOkBody(EXCHANGE_NOT_FOUND,
-                                                                    AMQShortString.validValueOf("Exchange '" + exchangeName + "' not found"));
-            }
-            else if (routingKey == null)
-            {
-                if (queueName == null)
-                {
-                    if (exchange.hasBindings())
-                    {
-                        response = methodRegistry.createExchangeBoundOkBody(OK, null);
-                    }
-                    else
-                    {
-
-                        response = methodRegistry.createExchangeBoundOkBody(NO_BINDINGS,	// replyCode
-                            null);	// replyText
-                    }
-                }
-                else
-                {
-
-                    AMQQueue queue = virtualHost.getQueue(queueName.toString());
-                    if (queue == null)
-                    {
-
-                        response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND,	// replyCode
-                            AMQShortString.validValueOf("Queue '" + queueName + "' not found"));	// replyText
-                    }
-                    else
-                    {
-                        if (exchange.isBound(queue))
-                        {
-
-                            response = methodRegistry.createExchangeBoundOkBody(OK,	// replyCode
-                                null);	// replyText
-                        }
-                        else
-                        {
-
-                            response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_BOUND,	// replyCode
-                                AMQShortString.validValueOf("Queue '" + queueName + "' not bound to exchange '" + exchangeName + "'"));	// replyText
-                        }
-                    }
-                }
-            }
-            else if (queueName != null)
-            {
-                AMQQueue queue = virtualHost.getQueue(queueName.toString());
-                if (queue == null)
-                {
-
-                    response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND,	// replyCode
-                        AMQShortString.validValueOf("Queue '" + queueName + "' not found"));	// replyText
-                }
-                else
-                {
-                    String bindingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().asString();
-                    if (exchange.isBound(bindingKey, queue))
-                    {
+        processChannelMethod(channelId,
+                             new ChannelAction()
+                             {
+                                 @Override
+                                 public void onChannel(final ChannelMethodProcessor channel)
+                                 {
+                                     channel.receiveExchangeBound(body.getExchange(), body.getQueue(), body.getRoutingKey());
+                                 }
+                             }
+                            );
 
-                        response = methodRegistry.createExchangeBoundOkBody(OK,	// replyCode
-                            null);	// replyText
-                    }
-                    else
-                    {
+        return true;
+    }
 
-                        String message = "Queue '" + queueName + "' not bound with routing key '" +
-                                            body.getRoutingKey() + "' to exchange '" + exchangeName + "'";
+    public boolean dispatchExchangeDeclare(final ExchangeDeclareBody body, int channelId)
+    {
+        processChannelMethod(channelId,
+                             new ChannelAction()
+                             {
+                                 @Override
+                                 public void onChannel(final ChannelMethodProcessor channel)
+                                 {
+                                     channel.receiveExchangeDeclare(body.getExchange(), body.getType(),
+                                                                    body.getPassive(),
+                                                                    body.getDurable(),
+                                                                    body.getAutoDelete(),
+                                                                    body.getInternal(),
+                                                                    body.getNowait(),
+                                                                    body.getArguments());
+                                 }
+                             }
+                            );
 
-                        response = methodRegistry.createExchangeBoundOkBody(SPECIFIC_QUEUE_NOT_BOUND_WITH_RK,	// replyCode
-                            AMQShortString.validValueOf(message));	// replyText
-                    }
-                }
-            }
-            else
-            {
-                if (exchange.isBound(body.getRoutingKey() == null ? "" : body.getRoutingKey().asString()))
-                {
+        return true;
+    }
 
-                    response = methodRegistry.createExchangeBoundOkBody(OK,	// replyCode
-                        null);	// replyText
-                }
-                else
-                {
+    public boolean dispatchExchangeDelete(final ExchangeDeleteBody body, int channelId)
+    {
+
+        processChannelMethod(channelId,
+                             new ChannelAction()
+                             {
+                                 @Override
+                                 public void onChannel(final ChannelMethodProcessor channel)
+                                 {
+                                     channel.receiveExchangeDelete(body.getExchange(),
+                                                                   body.getIfUnused(),
+                                                                   body.getNowait());
+                                 }
+                             }
+                            );
 
-                    response = methodRegistry.createExchangeBoundOkBody(NO_QUEUE_BOUND_WITH_RK,	// replyCode
-                        AMQShortString.validValueOf("No queue bound with routing key '" + body.getRoutingKey() +
-                        "' to exchange '" + exchangeName + "'"));	// replyText
-                }
-            }
-        }
-        _connection.writeFrame(response.generateFrame(channelId));
         return true;
     }
 
-    public boolean dispatchExchangeDeclare(ExchangeDeclareBody body, int channelId) throws AMQException
+    private boolean isDefaultExchange(final AMQShortString exchangeName)
     {
-        VirtualHostImpl virtualHost = _connection.getVirtualHost();
-        final AMQChannel channel = _connection.getChannel(channelId);
-        if (channel == null)
-        {
-            throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
-        }
-
-        final AMQShortString exchangeName = body.getExchange();
-        if (_logger.isDebugEnabled())
-        {
-            _logger.debug("Request to declare exchange of type " + body.getType() + " with name " + exchangeName);
-        }
-
-        ExchangeImpl exchange;
-
-        if(isDefaultExchange(exchangeName))
-        {
-            if(!new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_CLASS).equals(body.getType()))
-            {
-                throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare default exchange: "
-                                                                          + " of type "
-                                                                          + ExchangeDefaults.DIRECT_EXCHANGE_CLASS
-                                                                          + " to " + body.getType() +".",
-                                                 body.getClazz(), body.getMethod(),
-                                                 _connection.getMethodRegistry(),null);
-            }
-        }
-        else
-        {
-            if (body.getPassive())
-            {
-                exchange = virtualHost.getExchange(exchangeName.toString());
-                if(exchange == null)
-                {
-                    throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + exchangeName,
-                                                   _connection.getMethodRegistry());
-                }
-                else if (!(body.getType() == null || body.getType().length() ==0) && !exchange.getType().equals(body.getType().asString()))
-                {
+        return exchangeName == null || exchangeName.equals(AMQShortString.EMPTY_STRING);
+    }
 
-                    throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " +
-                                      exchangeName + " of type " + exchange.getType()
-                                      + " to " + body.getType() +".",
-                                                     body.getClazz(), body.getMethod(),
-                                                     _connection.getMethodRegistry(),null);
-                }
+    public boolean dispatchQueueBind(final QueueBindBody body, int channelId)
+    {
+        processChannelMethod(channelId,
+                             new ChannelAction()
+                             {
+                                 @Override
+                                 public void onChannel(final ChannelMethodProcessor channel)
+                                 {
+                                     channel.receiveQueueBind(body.getQueue(),
+                                                              body.getExchange(),
+                                                              body.getRoutingKey(),
+                                                              body.getNowait(),
+                                                              body.getArguments());
+                                 }
+                             }
+                            );
+
+        return true;
+    }
+
+    public boolean dispatchQueueDeclare(final QueueDeclareBody body, int channelId)
+    {
+        processChannelMethod(channelId,
+                             new ChannelAction()
+                             {
+                                 @Override
+                                 public void onChannel(final ChannelMethodProcessor channel)
+                                 {
+                                     channel.receiveQueueDeclare(body.getQueue(),
+                                                                 body.getPassive(),
+                                                                 body.getDurable(),
+                                                                 body.getExclusive(),
+                                                                 body.getAutoDelete(),
+                                                                 body.getNowait(),
+                                                                 body.getArguments());
+                                 }
+                             }
+                            );
+
+        return true;
+    }
+
+    public boolean dispatchQueueDelete(final QueueDeleteBody body, int channelId)
+    {
+        processChannelMethod(channelId,
+                             new ChannelAction()
+                             {
+                                 @Override
+                                 public void onChannel(final ChannelMethodProcessor channel)
+                                 {
+                                     channel.receiveQueueDelete(body.getQueue(),
+                                                                body.getIfUnused(),
+                                                                body.getIfEmpty(),
+                                                                body.getNowait());
+                                 }
+                             }
+                            );
+
+        return true;
+    }
+
+    public boolean dispatchQueuePurge(final QueuePurgeBody body, int channelId) throws AMQException
+    {
+
+        processChannelMethod(channelId,
+                             new ChannelAction()
+                             {
+                                 @Override
+                                 public void onChannel(final ChannelMethodProcessor channel)
+                                 {
+                                     channel.receiveQueuePurge(body.getQueue(),
+                                                               body.getNowait());
+                                 }
+                             }
+                            );
 
-            }
-            else
-            {
-                try
-                {
-                    String name = exchangeName == null ? null : exchangeName.intern().toString();
-                    String type = body.getType() == null ? null : body.getType().intern().toString();
+        return true;
+    }
 
-                    Map<String,Object> attributes = new HashMap<String, Object>();
-                    if(body.getArguments() != null)
-                    {
-                        attributes.putAll(FieldTable.convertToMap(body.getArguments()));
-                    }
-                    attributes.put(org.apache.qpid.server.model.Exchange.ID, null);
-                    attributes.put(org.apache.qpid.server.model.Exchange.NAME,name);
-                    attributes.put(org.apache.qpid.server.model.Exchange.TYPE,type);
-                    attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, body.getDurable());
-                    attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY,
-                                   body.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
-                    if(!attributes.containsKey(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE))
-                    {
-                        attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null);
-                    }
-                    exchange = virtualHost.createExchange(attributes);
 
-                }
-                catch(ReservedExchangeNameException e)
-                {
-                    throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
-                                                      "Attempt to declare exchange: " + exchangeName +
-                                                      " which begins with reserved prefix.",
-                                                      _connection.getMethodRegistry());
+    public boolean dispatchTxCommit(TxCommitBody body, final int channelId) throws AMQException
+    {
 
-                }
-                catch(ExchangeExistsException e)
-                {
-                    exchange = e.getExistingExchange();
-                    if(!new AMQShortString(exchange.getType()).equals(body.getType()))
-                    {
-                        throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: "
-                                                                                   + exchangeName + " of type "
-                                                                                   + exchange.getType()
-                                                                                   + " to " + body.getType() + ".",
-                                                          _connection.getMethodRegistry());
-                    }
-                }
-                catch(NoFactoryForTypeException e)
-                {
-                    throw body.getConnectionException(AMQConstant.COMMAND_INVALID,
-                                                      "Unknown exchange type '"
-                                                      + e.getType()
-                                                      + "' for exchange '"
-                                                      + exchangeName
-                                                      + "'",
-                                                      _connection.getMethodRegistry());
-                }
-                catch (AccessControlException e)
-                {
-                    throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
-                                                      e.getMessage(),
-                                                      _connection.getMethodRegistry());
-                }
-                catch (UnknownConfiguredObjectException e)
-                {
-                    // note - since 0-8/9/9-1 can't set the alt. exchange this exception should never occur
-                    throw body.getConnectionException(AMQConstant.NOT_FOUND,
-                                                      "Unknown alternate exchange "
-                                                      + (e.getName() != null
-                                                              ? "name: \"" + e.getName() + "\""
-                                                              : "id: " + e.getId()),
-                                                      _connection.getMethodRegistry());
-                }
-                catch (IllegalArgumentException e)
-                {
-                    throw body.getConnectionException(AMQConstant.COMMAND_INVALID,
-                                                      "Error creating exchange '"
-                                                      + exchangeName
-                                                      + "': "
-                                                      + e.getMessage(),
-                                                      _connection.getMethodRegistry());
-                }
-            }
-        }
+        processChannelMethod(channelId,
+                             new ChannelAction()
+                             {
+                                 @Override
+                                 public void onChannel(final ChannelMethodProcessor channel)
+                                 {
+                                     channel.receiveTxCommit();
+                                 }
+                             }
+                            );
 
-        if(!body.getNowait())
-        {
-            MethodRegistry methodRegistry = _connection.getMethodRegistry();
-            AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody();
-            channel.sync();
-            _connection.writeFrame(responseBody.generateFrame(channelId));
-        }
         return true;
     }
 
-    public boolean dispatchExchangeDelete(ExchangeDeleteBody body, int channelId) throws AMQException
+    public boolean dispatchTxRollback(TxRollbackBody body, final int channelId) throws AMQException
     {
-        VirtualHostImpl virtualHost = _connection.getVirtualHost();
-        final AMQChannel channel = _connection.getChannel(channelId);
-        if (channel == null)
-        {
-            throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
-        }
-        channel.sync();
-        try
-        {
-
-            if(isDefaultExchange(body.getExchange()))
-            {
-                throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
-                                                  "Default Exchange cannot be deleted",
-                                                  _connection.getMethodRegistry());
-            }
-
-            final String exchangeName = body.getExchange().toString();
-
-            final ExchangeImpl exchange = virtualHost.getExchange(exchangeName);
-            if(exchange == null)
-            {
-                throw body.getChannelException(AMQConstant.NOT_FOUND, "No such exchange: " + body.getExchange(),
-                                               _connection.getMethodRegistry());
-            }
 
-            virtualHost.removeExchange(exchange, !body.getIfUnused());
-
-            ExchangeDeleteOkBody responseBody = _connection.getMethodRegistry().createExchangeDeleteOkBody();
+        processChannelMethod(channelId,
+                             new ChannelAction()
+                             {
+                                 @Override
+                                 public void onChannel(final ChannelMethodProcessor channel)
+                                 {
+                                     channel.receiveTxRollback();
+                                 }
+                             }
+                            );
+        return true;
+    }
 
-            _connection.writeFrame(responseBody.generateFrame(channelId));
-        }
+    public boolean dispatchTxSelect(TxSelectBody body, int channelId) throws AMQException
+    {
+        processChannelMethod(channelId,
+                             new ChannelAction()
+                             {
+                                 @Override
+                                 public void onChannel(final ChannelMethodProcessor channel)
+                                 {
+                                     channel.receiveTxSelect();
+                                 }
+                             }
+                            );
+        return true;
+    }
 
-        catch (ExchangeIsAlternateException e)
-        {
-            throw body.getChannelException(AMQConstant.NOT_ALLOWED, "Exchange in use as an alternate exchange",
-                                           _connection.getMethodRegistry());
+    public boolean dispatchBasicRecoverSync(final BasicRecoverSyncBody body, int channelId) throws AMQException
+    {
+        processChannelMethod(channelId,
+                             new ChannelAction()
+                             {
+                                 @Override
+                                 public void onChannel(final ChannelMethodProcessor channel)
+                                 {
+                                     channel.receiveBasicRecover(body.getRequeue(), true);
+                                 }
+                             }
+                            );
 
-        }
-        catch (RequiredExchangeException e)
-        {
-            throw body.getChannelException(AMQConstant.NOT_ALLOWED,
-                                           "Exchange '" + body.getExchange() + "' cannot be deleted",
-                                           _connection.getMethodRegistry());
-        }
-        catch (AccessControlException e)
-        {
-            throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
-                                              e.getMessage(),
-                                              _connection.getMethodRegistry());
-        }
         return true;
     }
 
-    private boolean isDefaultExchange(final AMQShortString exchangeName)
+    public boolean dispatchBasicRecoverSyncOk(BasicRecoverSyncOkBody body, int channelId) throws AMQException
     {
-        return exchangeName == null || exchangeName.equals(AMQShortString.EMPTY_STRING);
+        throw new UnexpectedMethodException(body);
     }
 
-    public boolean dispatchQueueBind(QueueBindBody body, int channelId) throws AMQException
+    @Override
+    public boolean dispatchChannelAlert(final ChannelAlertBody body, final int channelId)
+            throws AMQException
     {
-        VirtualHostImpl virtualHost = _connection.getVirtualHost();
-        AMQChannel channel = _connection.getChannel(channelId);
-
-        if (channel == null)
-        {
-            throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
-        }
-
-        final AMQQueue queue;
-        final AMQShortString routingKey;
-
-        final AMQShortString queueName = body.getQueue();
-
-        if (queueName == null)
-        {
-
-            queue = channel.getDefaultQueue();
-
-            if (queue == null)
-            {
-                throw body.getChannelException(AMQConstant.NOT_FOUND,
-                                               "No default queue defined on channel and queue was null",
-                                               _connection.getMethodRegistry());
-            }
+        throw new UnexpectedMethodException(body);
+    }
 
-            if (body.getRoutingKey() == null)
-            {
-                routingKey = AMQShortString.valueOf(queue.getName());
-            }
-            else
-            {
-                routingKey = body.getRoutingKey().intern();
-            }
-        }
-        else
-        {
-            queue = virtualHost.getQueue(queueName.toString());
-            routingKey = body.getRoutingKey() == null ? AMQShortString.EMPTY_STRING : body.getRoutingKey().intern();
-        }
+    public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
 
-        if (queue == null)
-        {
-            throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + queueName + " does not exist.",
-                                           _connection.getMethodRegistry());
-        }
-
-        if(isDefaultExchange(body.getExchange()))
-        {
-            throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
-                                              "Cannot bind the queue " + queueName + " to the default exchange",
-                                              _connection.getMethodRegistry());
-        }
-
-        final String exchangeName = body.getExchange().toString();
-
-        final ExchangeImpl exch = virtualHost.getExchange(exchangeName);
-        if (exch == null)
-        {
-            throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + exchangeName + " does not exist.",
-                                           _connection.getMethodRegistry());
-        }
-
-
-        try
-        {
-
-            Map<String,Object> arguments = FieldTable.convertToMap(body.getArguments());
-            String bindingKey = String.valueOf(routingKey);
-
-            if (!exch.isBound(bindingKey, arguments, queue))
-            {
-
-                if(!exch.addBinding(bindingKey, queue, arguments) && ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(exch.getType()))
-                {
-                    exch.replaceBinding(bindingKey, queue, arguments);
-                }
-            }
-        }
-        catch (AccessControlException e)
-        {
-            throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
-                                              e.getMessage(),
-                                              _connection.getMethodRegistry());
-        }
-
-        if (_logger.isInfoEnabled())
-        {
-            _logger.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey);
-        }
-        if (!body.getNowait())
-        {
-            channel.sync();
-            MethodRegistry methodRegistry = _connection.getMethodRegistry();
-            AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody();
-            _connection.writeFrame(responseBody.generateFrame(channelId));
-
-        }
-        return true;
-    }
-
-    public boolean dispatchQueueDeclare(QueueDeclareBody body, int channelId) throws AMQException
+    public boolean dispatchQueueUnbind(final QueueUnbindBody body, int channelId) throws AMQException
     {
-        final AMQSessionModel session = _connection.getChannel(channelId);
-        VirtualHostImpl virtualHost = _connection.getVirtualHost();
-
-        final AMQShortString queueName;
-
-        // if we aren't given a queue name, we create one which we return to the client
-        if ((body.getQueue() == null) || (body.getQueue().length() == 0))
-        {
-            queueName = new AMQShortString("tmp_" + UUID.randomUUID());
-        }
-        else
-        {
-            queueName = body.getQueue().intern();
-        }
-
-        AMQQueue queue;
-
-        //TODO: do we need to check that the queue already exists with exactly the same "configuration"?
-
-        AMQChannel channel = _connection.getChannel(channelId);
-
-        if (channel == null)
-        {
-            throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
-        }
-
-        if(body.getPassive())
-        {
-            queue = virtualHost.getQueue(queueName.toString());
-            if (queue == null)
-            {
-                String msg = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ").";
-                throw body.getChannelException(AMQConstant.NOT_FOUND, msg, _connection.getMethodRegistry());
-            }
-            else
-            {
-                if (!queue.verifySessionAccess(channel))
-                {
-                    throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
-                                                      "Queue "
-                                                      + queue.getName()
-                                                      + " is exclusive, but not created on this Connection.",
-                                                      _connection.getMethodRegistry());
-                }
-
-                //set this as the default queue on the channel:
-                channel.setDefaultQueue(queue);
-            }
-        }
-        else
-        {
-
-            try
-            {
-
-                queue = createQueue(channel, queueName, body, virtualHost, _connection);
-
-            }
-            catch(QueueExistsException qe)
-            {
-
-                queue = qe.getExistingQueue();
-
-                if (!queue.verifySessionAccess(channel))
-                {
-                    throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
-                                                      "Queue "
-                                                      + queue.getName()
-                                                      + " is exclusive, but not created on this Connection.",
-                                                      _connection.getMethodRegistry());
-                }
-                else if(queue.isExclusive() != body.getExclusive())
-                {
-
-                    throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
-                                                   "Cannot re-declare queue '"
-                                                   + queue.getName()
-                                                   + "' with different exclusivity (was: "
-                                                   + queue.isExclusive()
-                                                   + " requested "
-                                                   + body.getExclusive()
-                                                   + ")",
-                                                   _connection.getMethodRegistry());
-                }
-                else if((body.getAutoDelete() && queue.getLifetimePolicy() != LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS)
-                    || (!body.getAutoDelete() && queue.getLifetimePolicy() != ((body.getExclusive() && !body.getDurable()) ? LifetimePolicy.DELETE_ON_CONNECTION_CLOSE : LifetimePolicy.PERMANENT)))
-                {

[... 478 lines stripped ...]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org