You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/09/28 17:22:10 UTC

svn commit: r1628074 [2/6] - in /qpid/branches/QPID-6125-ProtocolRefactoring/java: broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/...

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java Sun Sep 28 15:22:03 2014
@@ -36,7 +36,6 @@ import org.apache.qpid.server.filter.AMQ
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.protocol.v0_8.AMQChannel;
 import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
 import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -56,16 +55,16 @@ public class BasicConsumeMethodHandler i
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, BasicConsumeBody body, int channelId) throws AMQException
+    public void methodReceived(final AMQProtocolSession<?> connection,
+                               BasicConsumeBody body,
+                               int channelId) throws AMQException
     {
-        AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
-
-        AMQChannel channel = protocolConnection.getChannel(channelId);
-        VirtualHostImpl<?,?,?> vHost = protocolConnection.getVirtualHost();
+        AMQChannel channel = connection.getChannel(channelId);
+        VirtualHostImpl<?,?,?> vHost = connection.getVirtualHost();
 
         if (channel == null)
         {
-            throw body.getChannelNotFoundException(channelId);
+            throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
         }
         else
         {
@@ -119,12 +118,12 @@ public class BasicConsumeMethodHandler i
                 if (queueName != null)
                 {
                     String msg = "No such queue, '" + queueName + "'";
-                    throw body.getChannelException(AMQConstant.NOT_FOUND, msg);
+                    throw body.getChannelException(AMQConstant.NOT_FOUND, msg, connection.getMethodRegistry());
                 }
                 else
                 {
                     String msg = "No queue name provided, no default queue defined.";
-                    throw body.getConnectionException(AMQConstant.NOT_ALLOWED, msg);
+                    throw body.getConnectionException(AMQConstant.NOT_ALLOWED, msg, connection.getMethodRegistry());
                 }
             }
             else
@@ -153,9 +152,9 @@ public class BasicConsumeMethodHandler i
                                                                                body.getNoLocal());
                         if (!body.getNowait())
                         {
-                            MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
+                            MethodRegistry methodRegistry = connection.getMethodRegistry();
                             AMQMethodBody responseBody = methodRegistry.createBasicConsumeOkBody(consumerTag);
-                            protocolConnection.writeFrame(responseBody.generateFrame(channelId));
+                            connection.writeFrame(responseBody.generateFrame(channelId));
 
                         }
                     }
@@ -163,12 +162,12 @@ public class BasicConsumeMethodHandler i
                     {
                         AMQShortString msg = AMQShortString.validValueOf("Non-unique consumer tag, '" + body.getConsumerTag() + "'");
 
-                        MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
+                        MethodRegistry methodRegistry = connection.getMethodRegistry();
                         AMQMethodBody responseBody = methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(),    // replyCode
                                                                  msg,               // replytext
                                                                  body.getClazz(),
                                                                  body.getMethod());
-                        protocolConnection.writeFrame(responseBody.generateFrame(0));
+                        connection.writeFrame(responseBody.generateFrame(0));
                     }
 
                 }
@@ -176,12 +175,12 @@ public class BasicConsumeMethodHandler i
                 {
                     _logger.debug("Closing connection due to invalid selector");
 
-                    MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
+                    MethodRegistry methodRegistry = connection.getMethodRegistry();
                     AMQMethodBody responseBody = methodRegistry.createChannelCloseBody(AMQConstant.ARGUMENT_INVALID.getCode(),
                                                                                        AMQShortString.validValueOf(ise.getMessage()),
                                                                                        body.getClazz(),
                                                                                        body.getMethod());
-                    protocolConnection.writeFrame(responseBody.generateFrame(channelId));
+                    connection.writeFrame(responseBody.generateFrame(channelId));
 
 
                 }
@@ -190,28 +189,28 @@ public class BasicConsumeMethodHandler i
                     throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
                                                       "Cannot subscribe to queue "
                                                       + queue.getName()
-                                                      + " as it already has an existing exclusive consumer");
+                                                      + " as it already has an existing exclusive consumer", connection.getMethodRegistry());
                 }
                 catch (AMQQueue.ExistingConsumerPreventsExclusive e)
                 {
                     throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
                                                       "Cannot subscribe to queue "
                                                       + queue.getName()
-                                                      + " exclusively as it already has a consumer");
+                                                      + " exclusively as it already has a consumer", connection.getMethodRegistry());
                 }
                 catch (AccessControlException e)
                 {
                     throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
                                                       "Cannot subscribe to queue "
                                                       + queue.getName()
-                                                      + " permission denied");
+                                                      + " permission denied", connection.getMethodRegistry());
                 }
                 catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
                 {
                     throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
                                                       "Cannot subscribe to queue "
                                                       + queue.getName()
-                                                      + " as it already has an incompatible exclusivity policy");
+                                                      + " as it already has an incompatible exclusivity policy", connection.getMethodRegistry());
                 }
 
             }

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java Sun Sep 28 15:22:03 2014
@@ -45,7 +45,6 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod;
 import org.apache.qpid.server.protocol.v0_8.ConsumerTarget_0_8;
 import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
 import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -65,17 +64,17 @@ public class BasicGetMethodHandler imple
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, BasicGetBody body, int channelId) throws AMQException
+    public void methodReceived(final AMQProtocolSession<?> connection,
+                               BasicGetBody body,
+                               int channelId) throws AMQException
     {
-        AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
 
+        VirtualHostImpl vHost = connection.getVirtualHost();
 
-        VirtualHostImpl vHost = protocolConnection.getVirtualHost();
-
-        AMQChannel channel = protocolConnection.getChannel(channelId);
+        AMQChannel channel = connection.getChannel(channelId);
         if (channel == null)
         {
-            throw body.getChannelNotFoundException(channelId);
+            throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
         }
         else
         {
@@ -87,12 +86,12 @@ public class BasicGetMethodHandler imple
                 if(body.getQueue()!=null)
                 {
                     throw body.getConnectionException(AMQConstant.NOT_FOUND,
-                                                      "No such queue, '" + body.getQueue()+ "'");
+                                                      "No such queue, '" + body.getQueue()+ "'", connection.getMethodRegistry());
                 }
                 else
                 {
                     throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
-                                                      "No queue name provided, no default queue defined.");
+                                                      "No queue name provided, no default queue defined.", connection.getMethodRegistry());
                 }
             }
             else
@@ -100,36 +99,37 @@ public class BasicGetMethodHandler imple
 
                 try
                 {
-                    if (!performGet(queue,protocolConnection, channel, !body.getNoAck()))
+                    if (!performGet(queue,connection, channel, !body.getNoAck()))
                     {
-                        MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
+                        MethodRegistry methodRegistry = connection.getMethodRegistry();
                         // TODO - set clusterId
                         BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null);
 
 
-                        protocolConnection.writeFrame(responseBody.generateFrame(channelId));
+                        connection.writeFrame(responseBody.generateFrame(channelId));
                     }
                 }
                 catch (AccessControlException e)
                 {
                     throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
-                                                      e.getMessage());
+                                                      e.getMessage(), connection.getMethodRegistry());
                 }
                 catch (MessageSource.ExistingExclusiveConsumer e)
                 {
                     throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
-                                                      "Queue has an exclusive consumer");
+                                                      "Queue has an exclusive consumer", connection.getMethodRegistry());
                 }
                 catch (MessageSource.ExistingConsumerPreventsExclusive e)
                 {
                     throw body.getConnectionException(AMQConstant.INTERNAL_ERROR,
                                                       "The GET request has been evaluated as an exclusive consumer, " +
-                                                      "this is likely due to a programming error in the Qpid broker");
+                                                      "this is likely due to a programming error in the Qpid broker",
+                                                      connection.getMethodRegistry());
                 }
                 catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
                 {
                     throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
-                                                      "Queue has an incompatible exclusivit policy");
+                                                      "Queue has an incompatible exclusivit policy", connection.getMethodRegistry());
                 }
             }
         }

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java Sun Sep 28 15:22:03 2014
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.server.protocol.v0_8.handler;
 
+import java.security.AccessControlException;
+
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.AMQException;
@@ -30,12 +32,9 @@ import org.apache.qpid.protocol.AMQConst
 import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.protocol.v0_8.AMQChannel;
 import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
 import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 
-import java.security.AccessControlException;
-
 public class BasicPublishMethodHandler implements StateAwareMethodListener<BasicPublishBody>
 {
     private static final Logger _logger = Logger.getLogger(BasicPublishMethodHandler.class);
@@ -52,16 +51,17 @@ public class BasicPublishMethodHandler i
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, BasicPublishBody body, int channelId) throws AMQException
+    public void methodReceived(final AMQProtocolSession<?> connection,
+                               BasicPublishBody body,
+                               int channelId) throws AMQException
     {
-        AMQProtocolSession session = stateManager.getProtocolSession();
         if (_logger.isDebugEnabled())
         {
             _logger.debug("Publish received on channel " + channelId);
         }
 
         AMQShortString exchangeName = body.getExchange();
-        VirtualHostImpl vHost = session.getVirtualHost();
+        VirtualHostImpl vHost = connection.getVirtualHost();
 
         // TODO: check the delivery tag field details - is it unique across the broker or per subscriber?
 
@@ -79,21 +79,22 @@ public class BasicPublishMethodHandler i
         // if the exchange does not exist we raise a channel exception
         if (destination == null)
         {
-            throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange name");
+            throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange name",
+                                           connection.getMethodRegistry());
         }
         else
         {
             // The partially populated BasicDeliver frame plus the received route body
             // is stored in the channel. Once the final body frame has been received
             // it is routed to the exchange.
-            AMQChannel channel = session.getChannel(channelId);
+            AMQChannel channel = connection.getChannel(channelId);
 
             if (channel == null)
             {
-                throw body.getChannelNotFoundException(channelId);
+                throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
             }
 
-            MessagePublishInfo info = session.getMethodRegistry().getProtocolVersionMethodConverter().convertToInfo(body);
+            MessagePublishInfo info = connection.getMethodRegistry().getProtocolVersionMethodConverter().convertToInfo(body);
             info.setExchange(exchangeName);
             try
             {
@@ -101,7 +102,7 @@ public class BasicPublishMethodHandler i
             }
             catch (AccessControlException e)
             {
-                throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
+                throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry());
             }
         }
     }

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicQosHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicQosHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicQosHandler.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicQosHandler.java Sun Sep 28 15:22:03 2014
@@ -26,7 +26,6 @@ import org.apache.qpid.framing.BasicQosB
 import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.server.protocol.v0_8.AMQChannel;
 import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
 import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
 
 public class BasicQosHandler implements StateAwareMethodListener<BasicQosBody>
@@ -38,21 +37,22 @@ public class BasicQosHandler implements 
         return _instance;
     }
 
-    public void methodReceived(AMQStateManager stateManager, BasicQosBody body, int channelId) throws AMQException
+    public void methodReceived(final AMQProtocolSession<?> connection,
+                               BasicQosBody body,
+                               int channelId) throws AMQException
     {
-        AMQProtocolSession session = stateManager.getProtocolSession();
-        AMQChannel channel = session.getChannel(channelId);
+        AMQChannel channel = connection.getChannel(channelId);
         if (channel == null)
         {
-            throw body.getChannelNotFoundException(channelId);
+            throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
         }
         channel.sync();
         channel.setCredit(body.getPrefetchSize(), body.getPrefetchCount());
 
 
-        MethodRegistry methodRegistry = session.getMethodRegistry();
+        MethodRegistry methodRegistry = connection.getMethodRegistry();
         AMQMethodBody responseBody = methodRegistry.createBasicQosOkBody();
-        session.writeFrame(responseBody.generateFrame(channelId));
+        connection.writeFrame(responseBody.generateFrame(channelId));
 
     }
 }

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java Sun Sep 28 15:22:03 2014
@@ -29,7 +29,6 @@ import org.apache.qpid.framing.ProtocolV
 import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
 import org.apache.qpid.server.protocol.v0_8.AMQChannel;
 import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
 import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
 
 public class BasicRecoverMethodHandler implements StateAwareMethodListener<BasicRecoverBody>
@@ -43,29 +42,29 @@ public class BasicRecoverMethodHandler i
         return _instance;
     }
 
-    public void methodReceived(AMQStateManager stateManager, BasicRecoverBody body, int channelId) throws AMQException
+    public void methodReceived(final AMQProtocolSession<?> connection,
+                               BasicRecoverBody body,
+                               int channelId) throws AMQException
     {
-        AMQProtocolSession session = stateManager.getProtocolSession();
-
-        _logger.debug("Recover received on protocol session " + session + " and channel " + channelId);
-        AMQChannel channel = session.getChannel(channelId);
+        _logger.debug("Recover received on protocol session " + connection + " and channel " + channelId);
+        AMQChannel channel = connection.getChannel(channelId);
 
 
         if (channel == null)
         {
-            throw body.getChannelNotFoundException(channelId);
+            throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
         }
 
         channel.resend();
 
         // Qpid 0-8 hacks a synchronous -ok onto recover.
         // In Qpid 0-9 we create a separate sync-recover, sync-recover-ok pair to be "more" compliant
-        if(session.getProtocolVersion().equals(ProtocolVersion.v8_0))
+        if(connection.getProtocolVersion().equals(ProtocolVersion.v8_0))
         {
-            MethodRegistry_8_0 methodRegistry = (MethodRegistry_8_0) session.getMethodRegistry();
+            MethodRegistry_8_0 methodRegistry = (MethodRegistry_8_0) connection.getMethodRegistry();
             AMQMethodBody recoverOk = methodRegistry.createBasicRecoverOkBody();
             channel.sync();
-            session.writeFrame(recoverOk.generateFrame(channelId));
+            connection.writeFrame(recoverOk.generateFrame(channelId));
 
         }
 

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java Sun Sep 28 15:22:03 2014
@@ -31,7 +31,6 @@ import org.apache.qpid.framing.amqp_0_9.
 import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
 import org.apache.qpid.server.protocol.v0_8.AMQChannel;
 import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
 import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
 
 public class BasicRecoverSyncMethodHandler implements StateAwareMethodListener<BasicRecoverSyncBody>
@@ -45,35 +44,36 @@ public class BasicRecoverSyncMethodHandl
         return _instance;
     }
 
-    public void methodReceived(AMQStateManager stateManager, BasicRecoverSyncBody body, int channelId) throws AMQException
+    public void methodReceived(final AMQProtocolSession<?> connection,
+                               BasicRecoverSyncBody body,
+                               int channelId) throws AMQException
     {
-        AMQProtocolSession session = stateManager.getProtocolSession();
 
-        _logger.debug("Recover received on protocol session " + session + " and channel " + channelId);
-        AMQChannel channel = session.getChannel(channelId);
+        _logger.debug("Recover received on protocol session " + connection + " and channel " + channelId);
+        AMQChannel channel = connection.getChannel(channelId);
 
 
         if (channel == null)
         {
-            throw body.getChannelNotFoundException(channelId);
+            throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
         }
         channel.sync();
         channel.resend();
 
         // Qpid 0-8 hacks a synchronous -ok onto recover.
         // In Qpid 0-9 we create a separate sync-recover, sync-recover-ok pair to be "more" compliant
-        if(session.getProtocolVersion().equals(ProtocolVersion.v0_9))
+        if(connection.getProtocolVersion().equals(ProtocolVersion.v0_9))
         {
-            MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9) session.getMethodRegistry();
+            MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9) connection.getMethodRegistry();
             AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody();
-            session.writeFrame(recoverOk.generateFrame(channelId));
+            connection.writeFrame(recoverOk.generateFrame(channelId));
 
         }
-        else if(session.getProtocolVersion().equals(ProtocolVersion.v0_91))
+        else if(connection.getProtocolVersion().equals(ProtocolVersion.v0_91))
         {
-            MethodRegistry_0_91 methodRegistry = (MethodRegistry_0_91) session.getMethodRegistry();
+            MethodRegistry_0_91 methodRegistry = (MethodRegistry_0_91) connection.getMethodRegistry();
             AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody();
-            session.writeFrame(recoverOk.generateFrame(channelId));
+            connection.writeFrame(recoverOk.generateFrame(channelId));
 
         }
 

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java Sun Sep 28 15:22:03 2014
@@ -27,8 +27,6 @@ import org.apache.qpid.framing.BasicReje
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.protocol.v0_8.AMQChannel;
 import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
 import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
 
 public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicRejectBody>
@@ -46,15 +44,16 @@ public class BasicRejectMethodHandler im
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, BasicRejectBody body, int channelId) throws AMQException
+    public void methodReceived(final AMQProtocolSession<?> connection,
+                               BasicRejectBody body,
+                               int channelId) throws AMQException
     {
-        AMQProtocolSession session = stateManager.getProtocolSession();
 
-        AMQChannel channel = session.getChannel(channelId);
+        AMQChannel channel = connection.getChannel(channelId);
 
         if (channel == null)
         {
-            throw body.getChannelNotFoundException(channelId);
+            throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
         }
 
         if (_logger.isDebugEnabled())

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseHandler.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseHandler.java Sun Sep 28 15:22:03 2014
@@ -29,7 +29,6 @@ import org.apache.qpid.framing.MethodReg
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.protocol.v0_8.AMQChannel;
 import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
 import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
 
 public class ChannelCloseHandler implements StateAwareMethodListener<ChannelCloseBody>
@@ -47,9 +46,10 @@ public class ChannelCloseHandler impleme
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, ChannelCloseBody body, int channelId) throws AMQException
+    public void methodReceived(final AMQProtocolSession<?> connection,
+                               ChannelCloseBody body,
+                               int channelId) throws AMQException
     {
-        AMQProtocolSession session = stateManager.getProtocolSession();
 
         if (_logger.isInfoEnabled())
         {
@@ -58,19 +58,19 @@ public class ChannelCloseHandler impleme
         }
 
 
-        AMQChannel channel = session.getChannel(channelId);
+        AMQChannel channel = connection.getChannel(channelId);
 
         if (channel == null)
         {
-            throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, "Trying to close unknown channel");
+            throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, "Trying to close unknown channel", connection.getMethodRegistry());
         }
         channel.sync();
-        session.closeChannel(channelId);
+        connection.closeChannel(channelId);
         // Client requested closure so we don't wait for ok we send it
-        stateManager.getProtocolSession().closeChannelOk(channelId);
+        connection.closeChannelOk(channelId);
 
-        MethodRegistry methodRegistry = session.getMethodRegistry();
+        MethodRegistry methodRegistry = connection.getMethodRegistry();
         ChannelCloseOkBody responseBody = methodRegistry.createChannelCloseOkBody();
-        session.writeFrame(responseBody.generateFrame(channelId));
+        connection.writeFrame(responseBody.generateFrame(channelId));
     }
 }

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseOkHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseOkHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseOkHandler.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseOkHandler.java Sun Sep 28 15:22:03 2014
@@ -24,7 +24,7 @@ import org.apache.log4j.Logger;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
+import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
 import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
 
 public class ChannelCloseOkHandler implements StateAwareMethodListener<ChannelCloseOkBody>
@@ -42,12 +42,14 @@ public class ChannelCloseOkHandler imple
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, ChannelCloseOkBody body, int channelId) throws AMQException
+    public void methodReceived(final AMQProtocolSession<?> connection,
+                               ChannelCloseOkBody body,
+                               int channelId) throws AMQException
     {
 
         _logger.info("Received channel-close-ok for channel-id " + channelId);
 
         // Let the Protocol Session know the channel is now closed.
-        stateManager.getProtocolSession().closeChannelOk(channelId);
+        connection.closeChannelOk(channelId);
     }
 }

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelFlowHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelFlowHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelFlowHandler.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelFlowHandler.java Sun Sep 28 15:22:03 2014
@@ -28,7 +28,6 @@ import org.apache.qpid.framing.ChannelFl
 import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.server.protocol.v0_8.AMQChannel;
 import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
 import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
 
 public class ChannelFlowHandler implements StateAwareMethodListener<ChannelFlowBody>
@@ -46,23 +45,24 @@ public class ChannelFlowHandler implemen
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, ChannelFlowBody body, int channelId) throws AMQException
+    public void methodReceived(final AMQProtocolSession<?> connection,
+                               ChannelFlowBody body,
+                               int channelId) throws AMQException
     {
-        AMQProtocolSession session = stateManager.getProtocolSession();
 
 
-        AMQChannel channel = session.getChannel(channelId);
+        AMQChannel channel = connection.getChannel(channelId);
 
         if (channel == null)
         {
-            throw body.getChannelNotFoundException(channelId);
+            throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
         }
         channel.sync();
         channel.setSuspended(!body.getActive());
         _logger.debug("Channel.Flow for channel " + channelId + ", active=" + body.getActive());
 
-        MethodRegistry methodRegistry = session.getMethodRegistry();
+        MethodRegistry methodRegistry = connection.getMethodRegistry();
         AMQMethodBody responseBody = methodRegistry.createChannelFlowOkBody(body.getActive());
-        session.writeFrame(responseBody.generateFrame(channelId));
+        connection.writeFrame(responseBody.generateFrame(channelId));
     }
 }

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java Sun Sep 28 15:22:03 2014
@@ -20,6 +20,11 @@
  */
 package org.apache.qpid.server.protocol.v0_8.handler;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.UUID;
+
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.AMQException;
@@ -33,16 +38,10 @@ import org.apache.qpid.framing.amqp_8_0.
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.protocol.v0_8.AMQChannel;
 import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
 import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.UUID;
-
 public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenBody>
 {
     private static final Logger _logger = Logger.getLogger(ChannelOpenHandler.class);
@@ -58,10 +57,11 @@ public class ChannelOpenHandler implemen
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, ChannelOpenBody body, int channelId) throws AMQException
+    public void methodReceived(final AMQProtocolSession<?> connection,
+                               ChannelOpenBody body,
+                               int channelId) throws AMQException
     {
-        AMQProtocolSession session = stateManager.getProtocolSession();
-        VirtualHostImpl virtualHost = session.getVirtualHost();
+        VirtualHostImpl virtualHost = connection.getVirtualHost();
 
         // Protect the broker against out of order frame request.
         if (virtualHost == null)
@@ -70,13 +70,13 @@ public class ChannelOpenHandler implemen
         }
         _logger.info("Connecting to: " + virtualHost.getName());
 
-        final AMQChannel channel = new AMQChannel(session,channelId, virtualHost.getMessageStore());
+        final AMQChannel channel = new AMQChannel(connection,channelId, virtualHost.getMessageStore());
 
-        session.addChannel(channel);
+        connection.addChannel(channel);
 
         ChannelOpenOkBody response;
 
-        ProtocolVersion pv = session.getProtocolVersion();
+        ProtocolVersion pv = connection.getProtocolVersion();
 
         if(pv.equals(ProtocolVersion.v8_0))
         {
@@ -138,6 +138,6 @@ public class ChannelOpenHandler implemen
         }
 
 
-        session.writeFrame(response.generateFrame(channelId));
+        connection.writeFrame(response.generateFrame(channelId));
     }
 }

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseMethodHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseMethodHandler.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseMethodHandler.java Sun Sep 28 15:22:03 2014
@@ -27,7 +27,6 @@ import org.apache.qpid.framing.Connectio
 import org.apache.qpid.framing.ConnectionCloseOkBody;
 import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
 import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
 
 public class ConnectionCloseMethodHandler implements StateAwareMethodListener<ConnectionCloseBody>
@@ -45,28 +44,29 @@ public class ConnectionCloseMethodHandle
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, ConnectionCloseBody body, int channelId) throws AMQException
+    public void methodReceived(final AMQProtocolSession<?> connection,
+                               ConnectionCloseBody body,
+                               int channelId) throws AMQException
     {
-        AMQProtocolSession session = stateManager.getProtocolSession();
         if (_logger.isInfoEnabled())
         {
             _logger.info("ConnectionClose received with reply code/reply text " + body.getReplyCode() + "/" +
-                         body.getReplyText() + " for " + session);
+                         body.getReplyText() + " for " + connection);
         }
         try
         {
-            session.closeSession();
+            connection.closeSession();
         }
         catch (Exception e)
         {
             _logger.error("Error closing protocol session: " + e, e);
         }
 
-        MethodRegistry methodRegistry = session.getMethodRegistry();
+        MethodRegistry methodRegistry = connection.getMethodRegistry();
         ConnectionCloseOkBody responseBody = methodRegistry.createConnectionCloseOkBody();
-        session.writeFrame(responseBody.generateFrame(channelId));
+        connection.writeFrame(responseBody.generateFrame(channelId));
 
-        session.closeProtocolSession();
+        connection.closeProtocolSession();
 
     }
 }

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseOkMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseOkMethodHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseOkMethodHandler.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseOkMethodHandler.java Sun Sep 28 15:22:03 2014
@@ -26,7 +26,6 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.ConnectionCloseOkBody;
 import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
 import org.apache.qpid.server.protocol.v0_8.state.AMQState;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
 import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
 
 public class ConnectionCloseOkMethodHandler implements StateAwareMethodListener<ConnectionCloseOkBody>
@@ -44,16 +43,17 @@ public class ConnectionCloseOkMethodHand
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, ConnectionCloseOkBody body, int channelId) throws AMQException
+    public void methodReceived(final AMQProtocolSession<?> connection,
+                               ConnectionCloseOkBody body,
+                               int channelId) throws AMQException
     {
-        AMQProtocolSession session = stateManager.getProtocolSession();
         //todo should this not do more than just log the method?
         _logger.info("Received Connection-close-ok");
 
         try
         {
-            stateManager.changeState(AMQState.CONNECTION_CLOSED);
-            session.closeSession();
+            connection.changeState(AMQState.CONNECTION_CLOSED);
+            connection.closeSession();
         }
         catch (Exception e)
         {

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java Sun Sep 28 15:22:03 2014
@@ -34,7 +34,6 @@ import org.apache.qpid.server.model.Stat
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
 import org.apache.qpid.server.protocol.v0_8.state.AMQState;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
 import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 
@@ -58,9 +57,10 @@ public class ConnectionOpenMethodHandler
         return new AMQShortString(Long.toString(System.currentTimeMillis()));
     }
 
-    public void methodReceived(AMQStateManager stateManager, ConnectionOpenBody body, int channelId) throws AMQException
+    public void methodReceived(final AMQProtocolSession<?> connection,
+                               ConnectionOpenBody body,
+                               int channelId) throws AMQException
     {
-        AMQProtocolSession session = stateManager.getProtocolSession();
 
         //ignore leading '/'
         String virtualHostName;
@@ -73,42 +73,44 @@ public class ConnectionOpenMethodHandler
             virtualHostName = body.getVirtualHost() == null ? null : String.valueOf(body.getVirtualHost());
         }
 
-        VirtualHostImpl virtualHost = ((AmqpPort)stateManager.getProtocolSession().getPort()).getVirtualHost(virtualHostName);
+        VirtualHostImpl virtualHost = ((AmqpPort)connection.getPort()).getVirtualHost(virtualHostName);
 
         if (virtualHost == null)
         {
-            throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown virtual host: '" + virtualHostName + "'");
+            throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown virtual host: '" + virtualHostName + "'",
+                                              connection.getMethodRegistry());
         }
         else
         {
             // Check virtualhost access
             if (virtualHost.getState() != State.ACTIVE)
             {
-                throw body.getConnectionException(AMQConstant.CONNECTION_FORCED, "Virtual host '" + virtualHost.getName() + "' is not active");
+                throw body.getConnectionException(AMQConstant.CONNECTION_FORCED, "Virtual host '" + virtualHost.getName() + "' is not active",
+                                                  connection.getMethodRegistry());
             }
 
-            session.setVirtualHost(virtualHost);
+            connection.setVirtualHost(virtualHost);
             try
             {
-                virtualHost.getSecurityManager().authoriseCreateConnection(session);
+                virtualHost.getSecurityManager().authoriseCreateConnection(connection);
             }
             catch (AccessControlException e)
             {
-                throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
+                throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry());
             }
 
             // See Spec (0.8.2). Section  3.1.2 Virtual Hosts
-            if (session.getContextKey() == null)
+            if (connection.getContextKey() == null)
             {
-                session.setContextKey(generateClientID());
+                connection.setContextKey(generateClientID());
             }
 
-            MethodRegistry methodRegistry = session.getMethodRegistry();
+            MethodRegistry methodRegistry = connection.getMethodRegistry();
             AMQMethodBody responseBody =  methodRegistry.createConnectionOpenOkBody(body.getVirtualHost());
 
-            stateManager.changeState(AMQState.CONNECTION_OPEN);
+            connection.changeState(AMQState.CONNECTION_OPEN);
 
-            session.writeFrame(responseBody.generateFrame(channelId));
+            connection.writeFrame(responseBody.generateFrame(channelId));
         }
     }
 }

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java Sun Sep 28 15:22:03 2014
@@ -36,7 +36,6 @@ import org.apache.qpid.protocol.AMQConst
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
 import org.apache.qpid.server.protocol.v0_8.state.AMQState;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
 import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
 import org.apache.qpid.server.security.SubjectCreator;
 import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
@@ -56,19 +55,20 @@ public class ConnectionSecureOkMethodHan
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, ConnectionSecureOkBody body, int channelId) throws AMQException
+    public void methodReceived(final AMQProtocolSession<?> connection,
+                               ConnectionSecureOkBody body,
+                               int channelId) throws AMQException
     {
-        Broker<?> broker = stateManager.getBroker();
-        AMQProtocolSession session = stateManager.getProtocolSession();
+        Broker<?> broker = connection.getBroker();
 
-        SubjectCreator subjectCreator = stateManager.getSubjectCreator();
+        SubjectCreator subjectCreator = connection.getSubjectCreator();
 
-        SaslServer ss = session.getSaslServer();
+        SaslServer ss = connection.getSaslServer();
         if (ss == null)
         {
             throw new AMQException("No SASL context set up in session");
         }
-        MethodRegistry methodRegistry = session.getMethodRegistry();
+        MethodRegistry methodRegistry = connection.getMethodRegistry();
         SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, body.getResponse());
         switch (authResult.getStatus())
         {
@@ -78,7 +78,7 @@ public class ConnectionSecureOkMethodHan
                 _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage()));
 
                 // This should be abstracted
-                stateManager.changeState(AMQState.CONNECTION_CLOSING);
+                connection.changeState(AMQState.CONNECTION_CLOSING);
 
                 ConnectionCloseBody connectionCloseBody =
                         methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(),
@@ -86,15 +86,15 @@ public class ConnectionSecureOkMethodHan
                                                                  body.getClazz(),
                                                                  body.getMethod());
 
-                session.writeFrame(connectionCloseBody.generateFrame(0));
-                disposeSaslServer(session);
+                connection.writeFrame(connectionCloseBody.generateFrame(0));
+                disposeSaslServer(connection);
                 break;
             case SUCCESS:
                 if (_logger.isInfoEnabled())
                 {
                     _logger.info("Connected as: " + authResult.getSubject());
                 }
-                stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
+                connection.changeState(AMQState.CONNECTION_NOT_TUNED);
 
                 int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
 
@@ -107,15 +107,15 @@ public class ConnectionSecureOkMethodHan
                         methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(),
                                                                 frameMax,
                                                                 broker.getConnection_heartBeatDelay());
-                session.writeFrame(tuneBody.generateFrame(0));
-                session.setAuthorizedSubject(authResult.getSubject());
-                disposeSaslServer(session);
+                connection.writeFrame(tuneBody.generateFrame(0));
+                connection.setAuthorizedSubject(authResult.getSubject());
+                disposeSaslServer(connection);
                 break;
             case CONTINUE:
-                stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
+                connection.changeState(AMQState.CONNECTION_NOT_AUTH);
 
                 ConnectionSecureBody secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge());
-                session.writeFrame(secureBody.generateFrame(0));
+                connection.writeFrame(secureBody.generateFrame(0));
         }
     }
 

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java Sun Sep 28 15:22:03 2014
@@ -35,7 +35,6 @@ import org.apache.qpid.protocol.AMQConst
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
 import org.apache.qpid.server.protocol.v0_8.state.AMQState;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
 import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
 import org.apache.qpid.server.security.SubjectCreator;
 import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
@@ -56,32 +55,36 @@ public class ConnectionStartOkMethodHand
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, ConnectionStartOkBody body, int channelId) throws AMQException
+    public void methodReceived(final AMQProtocolSession<?> connection,
+                               ConnectionStartOkBody body,
+                               int channelId) throws AMQException
     {
-        Broker<?> broker = stateManager.getBroker();
-        AMQProtocolSession session = stateManager.getProtocolSession();
+        Broker<?> broker = connection.getBroker();
 
         _logger.info("SASL Mechanism selected: " + body.getMechanism());
         _logger.info("Locale selected: " + body.getLocale());
 
-        SubjectCreator subjectCreator = stateManager.getSubjectCreator();
+        SubjectCreator subjectCreator = connection.getSubjectCreator();
         SaslServer ss = null;
         try
         {
-            ss = subjectCreator.createSaslServer(String.valueOf(body.getMechanism()), session.getLocalFQDN(), session.getPeerPrincipal());
+            ss = subjectCreator.createSaslServer(String.valueOf(body.getMechanism()),
+                                                 connection.getLocalFQDN(),
+                                                 connection.getPeerPrincipal());
 
             if (ss == null)
             {
-                throw body.getConnectionException(AMQConstant.RESOURCE_ERROR, "Unable to create SASL Server:" + body.getMechanism());
+                throw body.getConnectionException(AMQConstant.RESOURCE_ERROR, "Unable to create SASL Server:" + body.getMechanism(),
+                                                  connection.getMethodRegistry());
             }
 
-            session.setSaslServer(ss);
+            connection.setSaslServer(ss);
 
             final SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, body.getResponse());
             //save clientProperties
-            session.setClientProperties(body.getClientProperties());
+            connection.setClientProperties(body.getClientProperties());
 
-            MethodRegistry methodRegistry = session.getMethodRegistry();
+            MethodRegistry methodRegistry = connection.getMethodRegistry();
 
             switch (authResult.getStatus())
             {
@@ -90,7 +93,7 @@ public class ConnectionStartOkMethodHand
 
                     _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage()));
 
-                    stateManager.changeState(AMQState.CONNECTION_CLOSING);
+                    connection.changeState(AMQState.CONNECTION_CLOSING);
 
                     ConnectionCloseBody closeBody =
                             methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(),    // replyCode
@@ -98,8 +101,8 @@ public class ConnectionStartOkMethodHand
                                                                      body.getClazz(),
                                                                      body.getMethod());
 
-                    session.writeFrame(closeBody.generateFrame(0));
-                    disposeSaslServer(session);
+                    connection.writeFrame(closeBody.generateFrame(0));
+                    disposeSaslServer(connection);
                     break;
 
                 case SUCCESS:
@@ -107,9 +110,9 @@ public class ConnectionStartOkMethodHand
                     {
                         _logger.info("Connected as: " + authResult.getSubject());
                     }
-                    session.setAuthorizedSubject(authResult.getSubject());
+                    connection.setAuthorizedSubject(authResult.getSubject());
 
-                    stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
+                    connection.changeState(AMQState.CONNECTION_NOT_TUNED);
                     int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
 
                     if(frameMax <= 0)
@@ -120,18 +123,18 @@ public class ConnectionStartOkMethodHand
                     ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(),
                                                                                           frameMax,
                                                                                           broker.getConnection_heartBeatDelay());
-                    session.writeFrame(tuneBody.generateFrame(0));
+                    connection.writeFrame(tuneBody.generateFrame(0));
                     break;
                 case CONTINUE:
-                    stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
+                    connection.changeState(AMQState.CONNECTION_NOT_AUTH);
 
                     ConnectionSecureBody secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge());
-                    session.writeFrame(secureBody.generateFrame(0));
+                    connection.writeFrame(secureBody.generateFrame(0));
             }
         }
         catch (SaslException e)
         {
-            disposeSaslServer(session);
+            disposeSaslServer(connection);
             throw new AMQException("SASL error: " + e, e);
         }
     }

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java Sun Sep 28 15:22:03 2014
@@ -29,7 +29,6 @@ import org.apache.qpid.protocol.AMQConst
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
 import org.apache.qpid.server.protocol.v0_8.state.AMQState;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
 import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
 
 public class ConnectionTuneOkMethodHandler implements StateAwareMethodListener<ConnectionTuneOkBody>
@@ -43,19 +42,20 @@ public class ConnectionTuneOkMethodHandl
         return _instance;
     }
 
-    public void methodReceived(AMQStateManager stateManager, ConnectionTuneOkBody body, int channelId) throws AMQException
+    public void methodReceived(final AMQProtocolSession<?> connection,
+                               ConnectionTuneOkBody body,
+                               int channelId) throws AMQException
     {
-        AMQProtocolSession session = stateManager.getProtocolSession();
 
         if (_logger.isDebugEnabled())
         {
             _logger.debug(body);
         }
-        stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
+        connection.changeState(AMQState.CONNECTION_NOT_OPENED);
 
-        session.initHeartbeats(body.getHeartbeat());
+        connection.initHeartbeats(body.getHeartbeat());
 
-        int brokerFrameMax = stateManager.getBroker().getContextValue(Integer.class,Broker.BROKER_FRAME_SIZE);
+        int brokerFrameMax = connection.getBroker().getContextValue(Integer.class,Broker.BROKER_FRAME_SIZE);
         if(brokerFrameMax <= 0)
         {
             brokerFrameMax = Integer.MAX_VALUE;
@@ -68,7 +68,7 @@ public class ConnectionTuneOkMethodHandl
                                              + "greater than the broker will allow: "
                                              + brokerFrameMax,
                                              body.getClazz(), body.getMethod(),
-                                             body.getMajor(), body.getMinor(),null);
+                                             connection.getMethodRegistry(),null);
         }
         else if(body.getFrameMax() > 0 && body.getFrameMax() < AMQConstant.FRAME_MIN_SIZE.getCode())
         {
@@ -77,13 +77,13 @@ public class ConnectionTuneOkMethodHandl
                                              + "which is smaller than the specification definined minimum: "
                                              + AMQConstant.FRAME_MIN_SIZE.getCode(),
                                              body.getClazz(), body.getMethod(),
-                                             body.getMajor(), body.getMinor(),null);
+                                             connection.getMethodRegistry(),null);
         }
         int frameMax = body.getFrameMax() == 0 ? brokerFrameMax : (int) body.getFrameMax();
-        session.setMaxFrameSize(frameMax);
+        connection.setMaxFrameSize(frameMax);
 
         long maxChannelNumber = body.getChannelMax();
         //0 means no implied limit, except that forced by protocol limitations (0xFFFF)
-        session.setMaximumNumberOfChannels( maxChannelNumber == 0 ? 0xFFFFL : maxChannelNumber);
+        connection.setMaximumNumberOfChannels(maxChannelNumber == 0 ? 0xFFFFL : maxChannelNumber);
     }
 }

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java Sun Sep 28 15:22:03 2014
@@ -29,7 +29,6 @@ import org.apache.qpid.server.exchange.E
 import org.apache.qpid.server.protocol.v0_8.AMQChannel;
 import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
 import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 
@@ -65,16 +64,17 @@ public class ExchangeBoundHandler implem
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, ExchangeBoundBody body, int channelId) throws AMQException
+    public void methodReceived(final AMQProtocolSession<?> connection,
+                               ExchangeBoundBody body,
+                               int channelId) throws AMQException
     {
-        AMQProtocolSession session = stateManager.getProtocolSession();
-        VirtualHostImpl virtualHost = session.getVirtualHost();
-        MethodRegistry methodRegistry = session.getMethodRegistry();
+        VirtualHostImpl virtualHost = connection.getVirtualHost();
+        MethodRegistry methodRegistry = connection.getMethodRegistry();
 
-        final AMQChannel channel = session.getChannel(channelId);
+        final AMQChannel channel = connection.getChannel(channelId);
         if (channel == null)
         {
-            throw body.getChannelNotFoundException(channelId);
+            throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
         }
         channel.sync();
 
@@ -227,7 +227,7 @@ public class ExchangeBoundHandler implem
                 }
             }
         }
-        session.writeFrame(response.generateFrame(channelId));
+        connection.writeFrame(response.generateFrame(channelId));
     }
 
     protected boolean isDefaultExchange(final AMQShortString exchangeName)

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java Sun Sep 28 15:22:03 2014
@@ -41,7 +41,6 @@ import org.apache.qpid.server.model.NoFa
 import org.apache.qpid.server.model.UnknownConfiguredObjectException;
 import org.apache.qpid.server.protocol.v0_8.AMQChannel;
 import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
 import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
 import org.apache.qpid.server.virtualhost.ExchangeExistsException;
 import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
@@ -62,14 +61,15 @@ public class ExchangeDeclareHandler impl
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, ExchangeDeclareBody body, int channelId) throws AMQException
+    public void methodReceived(final AMQProtocolSession<?> connection,
+                               ExchangeDeclareBody body,
+                               int channelId) throws AMQException
     {
-        AMQProtocolSession session = stateManager.getProtocolSession();
-        VirtualHostImpl virtualHost = session.getVirtualHost();
-        final AMQChannel channel = session.getChannel(channelId);
+        VirtualHostImpl virtualHost = connection.getVirtualHost();
+        final AMQChannel channel = connection.getChannel(channelId);
         if (channel == null)
         {
-            throw body.getChannelNotFoundException(channelId);
+            throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
         }
 
         final AMQShortString exchangeName = body.getExchange();
@@ -89,7 +89,7 @@ public class ExchangeDeclareHandler impl
                                                                           + ExchangeDefaults.DIRECT_EXCHANGE_CLASS
                                                                           + " to " + body.getType() +".",
                                                  body.getClazz(), body.getMethod(),
-                                                 body.getMajor(), body.getMinor(),null);
+                                                 connection.getMethodRegistry(),null);
             }
         }
         else
@@ -99,14 +99,15 @@ public class ExchangeDeclareHandler impl
                 exchange = virtualHost.getExchange(exchangeName.toString());
                 if(exchange == null)
                 {
-                    throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + exchangeName);
+                    throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + exchangeName,
+                                                   connection.getMethodRegistry());
                 }
                 else if (!(body.getType() == null || body.getType().length() ==0) && !exchange.getType().equals(body.getType().asString()))
                 {
 
                     throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " +
                                       exchangeName + " of type " + exchange.getType()
-                                      + " to " + body.getType() +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor(),null);
+                                      + " to " + body.getType() +".",body.getClazz(), body.getMethod(),connection.getMethodRegistry(),null);
                 }
 
             }
@@ -139,7 +140,7 @@ public class ExchangeDeclareHandler impl
                 {
                     throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
                                               "Attempt to declare exchange: " + exchangeName +
-                                              " which begins with reserved prefix.");
+                                              " which begins with reserved prefix.", connection.getMethodRegistry());
 
                 }
                 catch(ExchangeExistsException e)
@@ -147,40 +148,44 @@ public class ExchangeDeclareHandler impl
                     exchange = e.getExistingExchange();
                     if(!new AMQShortString(exchange.getType()).equals(body.getType()))
                     {
-                        throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: "
-                                                                                  + exchangeName + " of type "
-                                                                                  + exchange.getType()
-                                                                                  + " to " + body.getType() +".",
-                                                         body.getClazz(), body.getMethod(),
-                                                         body.getMajor(), body.getMinor(),null);
+                        throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: "
+                                                                                   + exchangeName + " of type "
+                                                                                   + exchange.getType()
+                                                                                   + " to " + body.getType() + ".",
+                                                          connection.getMethodRegistry());
                     }
                 }
                 catch(NoFactoryForTypeException e)
                 {
-                    throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + exchangeName,e);
+                    throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange type '"+e.getType()+"' for exchange '" + exchangeName + "'", connection.getMethodRegistry());
                 }
                 catch (AccessControlException e)
                 {
-                    throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
+                    throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry());
                 }
                 catch (UnknownConfiguredObjectException e)
                 {
                     // note - since 0-8/9/9-1 can't set the alt. exchange this exception should never occur
-                    throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown alternate exchange",e);
+                    throw body.getConnectionException(AMQConstant.NOT_FOUND,
+                                                      "Unknown alternate exchange "
+                                                        + (e.getName() != null
+                                                              ? "name: \"" + e.getName() + "\""
+                                                              : "id: " + e.getId()),
+                                                      connection.getMethodRegistry());
                 }
                 catch (IllegalArgumentException e)
                 {
-                    throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Error creating exchange",e);
+                    throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Error creating exchange '"+exchangeName+"': " + e.getMessage(),connection.getMethodRegistry());
                 }
             }
         }
 
         if(!body.getNowait())
         {
-            MethodRegistry methodRegistry = session.getMethodRegistry();
+            MethodRegistry methodRegistry = connection.getMethodRegistry();
             AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody();
             channel.sync();
-            session.writeFrame(responseBody.generateFrame(channelId));
+            connection.writeFrame(responseBody.generateFrame(channelId));
         }
     }
 

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java Sun Sep 28 15:22:03 2014
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.server.protocol.v0_8.handler;
 
+import java.security.AccessControlException;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ExchangeDeleteBody;
@@ -28,14 +30,11 @@ import org.apache.qpid.protocol.AMQConst
 import org.apache.qpid.server.exchange.ExchangeImpl;
 import org.apache.qpid.server.protocol.v0_8.AMQChannel;
 import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
 import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
 import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
 import org.apache.qpid.server.virtualhost.RequiredExchangeException;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 
-import java.security.AccessControlException;
-
 public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeDeleteBody>
 {
     private static final ExchangeDeleteHandler _instance = new ExchangeDeleteHandler();
@@ -49,14 +48,15 @@ public class ExchangeDeleteHandler imple
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, ExchangeDeleteBody body, int channelId) throws AMQException
+    public void methodReceived(final AMQProtocolSession<?> connection,
+                               ExchangeDeleteBody body,
+                               int channelId) throws AMQException
     {
-        AMQProtocolSession session = stateManager.getProtocolSession();
-        VirtualHostImpl virtualHost = session.getVirtualHost();
-        final AMQChannel channel = session.getChannel(channelId);
+        VirtualHostImpl virtualHost = connection.getVirtualHost();
+        final AMQChannel channel = connection.getChannel(channelId);
         if (channel == null)
         {
-            throw body.getChannelNotFoundException(channelId);
+            throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
         }
         channel.sync();
         try
@@ -64,7 +64,7 @@ public class ExchangeDeleteHandler imple
 
             if(isDefaultExchange(body.getExchange()))
             {
-                throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Default Exchange cannot be deleted");
+                throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Default Exchange cannot be deleted", connection.getMethodRegistry());
             }
 
             final String exchangeName = body.getExchange().toString();
@@ -72,28 +72,31 @@ public class ExchangeDeleteHandler imple
             final ExchangeImpl exchange = virtualHost.getExchange(exchangeName);
             if(exchange == null)
             {
-                throw body.getChannelException(AMQConstant.NOT_FOUND, "No such exchange: " + body.getExchange());
+                throw body.getChannelException(AMQConstant.NOT_FOUND, "No such exchange: " + body.getExchange(),
+                                               connection.getMethodRegistry());
             }
 
             virtualHost.removeExchange(exchange, !body.getIfUnused());
 
-            ExchangeDeleteOkBody responseBody = session.getMethodRegistry().createExchangeDeleteOkBody();
+            ExchangeDeleteOkBody responseBody = connection.getMethodRegistry().createExchangeDeleteOkBody();
 
-            session.writeFrame(responseBody.generateFrame(channelId));
+            connection.writeFrame(responseBody.generateFrame(channelId));
         }
 
         catch (ExchangeIsAlternateException e)
         {
-            throw body.getChannelException(AMQConstant.NOT_ALLOWED, "Exchange in use as an alternate exchange");
+            throw body.getChannelException(AMQConstant.NOT_ALLOWED, "Exchange in use as an alternate exchange",
+                                           connection.getMethodRegistry());
 
         }
         catch (RequiredExchangeException e)
         {
-            throw body.getChannelException(AMQConstant.NOT_ALLOWED, "Exchange '"+body.getExchange()+"' cannot be deleted");
+            throw body.getChannelException(AMQConstant.NOT_ALLOWED, "Exchange '"+body.getExchange()+"' cannot be deleted",
+                                           connection.getMethodRegistry());
         }
         catch (AccessControlException e)
         {
-            throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
+            throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry());
         }
     }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org