You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2007/01/19 23:19:55 UTC

svn commit: r497974 [1/2] - in /incubator/qpid/branches/qpid.0-9/java: broker/src/main/java/org/apache/qpid/server/handler/ broker/src/main/java/org/apache/qpid/server/protocol/ broker/src/main/java/org/apache/qpid/server/state/ cluster/src/main/java/o...

Author: kpvdr
Date: Fri Jan 19 14:19:51 2007
New Revision: 497974

URL: http://svn.apache.org/viewvc?view=rev&rev=497974
Log:
Introduced channel close methods into AMQMinaProtocolSession.java; Refactored StateAwareMethodListener.java to simplify call and remove redundant parameters, reworked all affected handlers. Connected the AMQP version information to the protocol session in all broker handlers.

Modified:
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageAppendHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCheckpointHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCloseHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageEmptyHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOffsetHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOkHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOpenHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRejectHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageResumeHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java
    incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java
    incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java
    incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java
    incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java
    incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java
    incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java
    incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java
    incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java
    incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java
    incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java
    incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java
    incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java
    incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java
    incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java Fri Jan 19 14:19:51 2007
@@ -47,17 +47,12 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
-                               ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+    public void methodReceived(AMQProtocolSession protocolSession,
                                AMQMethodEvent<ChannelCloseBody> evt) throws AMQException
     {
         ChannelCloseBody body = evt.getMethod();
         _logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId +
                      " and method " + body.methodId);
-        // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-        // Be aware of possible changes to parameter order as versions change.
-        protocolSession.writeResponse(evt, ChannelCloseOkBody.createMethodBody((byte)0, (byte)9));
-        protocolSession.closeChannel(evt.getChannelId());
+        protocolSession.closeChannelResponse(evt.getChannelId(), evt.getRequestId());
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java Fri Jan 19 14:19:51 2007
@@ -45,10 +45,10 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
-                               ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+    public void methodReceived(AMQProtocolSession protocolSession,
                                AMQMethodEvent<ChannelCloseOkBody> evt) throws AMQException
     {
         _logger.info("Received channel-close-ok for channel-id " + evt.getChannelId());
+        protocolSession.removeChannel(evt.getChannelId());
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java Fri Jan 19 14:19:51 2007
@@ -48,8 +48,7 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
-                               ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+    public void methodReceived(AMQProtocolSession protocolSession,
                                AMQMethodEvent<ChannelFlowBody> evt) throws AMQException
     {
         ChannelFlowBody body = evt.getMethod();
@@ -58,12 +57,11 @@
         channel.setSuspended(!body.active);
         _logger.debug("Channel.Flow for channel " + evt.getChannelId() + ", active=" + body.active);
 
-        // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.
-        AMQMethodBody response = ChannelFlowOkBody.createMethodBody
-            ((byte)0, (byte)9,	// AMQP version (major, minor)
-             body.active);	// active
+        AMQMethodBody response = ChannelFlowOkBody.createMethodBody(
+            protocolSession.getMajor(), // AMQP major version
+            protocolSession.getMinor(), // AMQP minor version
+            body.active);	// active
         protocolSession.writeResponse(evt, response);
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java Fri Jan 19 14:19:51 2007
@@ -47,15 +47,15 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
-                               ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+    public void methodReceived(AMQProtocolSession protocolSession,
                                AMQMethodEvent<ChannelOpenBody> evt) throws AMQException
     {
-        // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.
         // XXX: Client id
-        AMQMethodBody response = ChannelOpenOkBody.createMethodBody((byte)0, (byte)9, "XXX".getBytes());
+        AMQMethodBody response = ChannelOpenOkBody.createMethodBody(
+            protocolSession.getMajor(), // AMQP major version
+            protocolSession.getMinor(), // AMQP minor version
+            "XXX".getBytes());
         protocolSession.writeResponse(evt, response);
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java Fri Jan 19 14:19:51 2007
@@ -47,17 +47,16 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
-                               ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+    public void methodReceived(AMQProtocolSession protocolSession,
                                AMQMethodEvent<ConnectionCloseBody> evt) throws AMQException
     {
         final ConnectionCloseBody body = evt.getMethod();
         _logger.info("ConnectionClose received with reply code/reply text " + body.replyCode + "/" +
                      body.replyText +  " for " + protocolSession);
-        // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.
-        protocolSession.writeResponse(evt, ConnectionCloseOkBody.createMethodBody((byte)0, (byte)9));
+        protocolSession.writeResponse(evt, ConnectionCloseOkBody.createMethodBody(
+            protocolSession.getMajor(),  // AMQP major version
+            protocolSession.getMinor())); // AMQP minor version
         try
         {
             protocolSession.closeSession();

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java Fri Jan 19 14:19:51 2007
@@ -46,8 +46,7 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
-                               ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+    public void methodReceived(AMQProtocolSession protocolSession,
                                AMQMethodEvent<ConnectionCloseOkBody> evt) throws AMQException
     {
         //todo should this not do more than just log the method?
@@ -55,7 +54,7 @@
 
         try
         {
-            stateManager.changeState(AMQState.CONNECTION_CLOSED);
+            protocolSession.getStateManager().changeState(AMQState.CONNECTION_CLOSED);
             protocolSession.closeSession();
         }
         catch (Exception e)

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java Fri Jan 19 14:19:51 2007
@@ -50,8 +50,7 @@
         return Long.toString(System.currentTimeMillis());
     }
 
-    public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
-                               ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+    public void methodReceived(AMQProtocolSession protocolSession,
                                AMQMethodEvent<ConnectionOpenBody> evt) throws AMQException
     {
         ConnectionOpenBody body = evt.getMethod();
@@ -64,13 +63,12 @@
             contextKey = generateClientID();
         }
         protocolSession.setContextKey(contextKey);
-        // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.
-        AMQMethodBody response = ConnectionOpenOkBody.createMethodBody
-            ((byte)0, (byte)9,	// AMQP version (major, minor)
-             contextKey);	// knownHosts
-        stateManager.changeState(AMQState.CONNECTION_OPEN);
+        AMQMethodBody response = ConnectionOpenOkBody.createMethodBody(
+            protocolSession.getMajor(), // AMQP major version
+            protocolSession.getMinor(), // AMQP minor version
+            contextKey);	// knownHosts
+        protocolSession.getStateManager().changeState(AMQState.CONNECTION_OPEN);
         protocolSession.writeResponse(evt, response);
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java Fri Jan 19 14:19:51 2007
@@ -54,8 +54,7 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
-                               ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+    public void methodReceived(AMQProtocolSession protocolSession,
                                AMQMethodEvent<ConnectionSecureOkBody> evt) throws AMQException
     {
         ConnectionSecureOkBody body = evt.getMethod();
@@ -68,6 +67,9 @@
         }
 
         AuthenticationResult authResult = authMgr.authenticate(ss, body.response);
+        AMQStateManager stateManager = protocolSession.getStateManager();
+        byte major = protocolSession.getMajor();
+        byte minor = protocolSession.getMinor();
         switch (authResult.status)
         {
             case ERROR:
@@ -75,15 +77,13 @@
                 // throw new AMQException(AMQConstant.NOT_ALLOWED.getCode(), AMQConstant.NOT_ALLOWED.getName());
                 _logger.info("Authentication failed");
                 stateManager.changeState(AMQState.CONNECTION_CLOSING);
-                // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-                // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
                 // Be aware of possible changes to parameter order as versions change.
-                AMQMethodBody close = ConnectionCloseBody.createMethodBody
-                    ((byte)0, (byte)9,	// AMQP version (major, minor)
-                     ConnectionCloseBody.getClazz((byte)0, (byte)9),		// classId
-                     ConnectionCloseBody.getMethod((byte)0, (byte)9),	// methodId
-                     AMQConstant.NOT_ALLOWED.getCode(),	// replyCode
-                     AMQConstant.NOT_ALLOWED.getName());	// replyText
+                AMQMethodBody close = ConnectionCloseBody.createMethodBody(
+                    major, minor,	// AMQP version (major, minor)
+                    ConnectionCloseBody.getClazz(major, minor),		// classId
+                    ConnectionCloseBody.getMethod(major, minor),	// methodId
+                    AMQConstant.NOT_ALLOWED.getCode(),	// replyCode
+                    AMQConstant.NOT_ALLOWED.getName());	// replyText
                 protocolSession.writeResponse(evt, close);
                 disposeSaslServer(protocolSession);
                 break;
@@ -93,25 +93,21 @@
                 // TODO: Check the value of channelMax here: This should be the max
                 // value of a 2-byte unsigned integer (as channel is only 2 bytes on the wire),
                 // not Integer.MAX_VALUE (which is signed 4 bytes).
-                // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-                // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
                 // Be aware of possible changes to parameter order as versions change.
-                AMQMethodBody tune = ConnectionTuneBody.createMethodBody
-                    ((byte)0, (byte)9,	// AMQP version (major, minor)
-                     Integer.MAX_VALUE,	// channelMax
-                     ConnectionStartOkMethodHandler.getConfiguredFrameSize(),	// frameMax
-                     HeartbeatConfig.getInstance().getDelay());	// heartbeat
+                AMQMethodBody tune = ConnectionTuneBody.createMethodBody(
+                    major, minor,	// AMQP version (major, minor)
+                    Integer.MAX_VALUE,	// channelMax
+                    ConnectionStartOkMethodHandler.getConfiguredFrameSize(),	// frameMax
+                    HeartbeatConfig.getInstance().getDelay());	// heartbeat
                 protocolSession.writeResponse(evt, tune);
                 disposeSaslServer(protocolSession);
                 break;
             case CONTINUE:
                 stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
-                // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-                // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
                 // Be aware of possible changes to parameter order as versions change.
-                AMQMethodBody challenge = ConnectionSecureBody.createMethodBody
-                    ((byte)0, (byte)9,	// AMQP version (major, minor)
-                     authResult.challenge);	// challenge
+                AMQMethodBody challenge = ConnectionSecureBody.createMethodBody(
+                    major, minor,	// AMQP version (major, minor)
+                    authResult.challenge);	// challenge
                 protocolSession.writeResponse(evt, challenge);
         }
     }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java Fri Jan 19 14:19:51 2007
@@ -60,8 +60,7 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
-                               ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+    public void methodReceived(AMQProtocolSession protocolSession,
                                AMQMethodEvent<ConnectionStartOkBody> evt) throws AMQException
     {
         final ConnectionStartOkBody body = evt.getMethod();
@@ -84,6 +83,7 @@
                 protocolSession.setClientProperties(body.clientProperties);
             }
 
+            AMQStateManager stateManager = protocolSession.getStateManager();
             switch (authResult.status)
             {
                 case ERROR:
@@ -92,24 +92,22 @@
                     _logger.info("Connected as: " + ss.getAuthorizationID());
 
                     stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
-                    // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-                    // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
                     // Be aware of possible changes to parameter order as versions change.
-                    AMQMethodBody tune = ConnectionTuneBody.createMethodBody
-                        ((byte)0, (byte)9,	// AMQP version (major, minor)
-                         Integer.MAX_VALUE,	// channelMax
-                         getConfiguredFrameSize(),	// frameMax
-                         HeartbeatConfig.getInstance().getDelay());	// heartbeat
+                    AMQMethodBody tune = ConnectionTuneBody.createMethodBody(
+                        protocolSession.getMajor(), // AMQP major version
+                        protocolSession.getMinor(), // AMQP minor version
+                        Integer.MAX_VALUE,	// channelMax
+                        getConfiguredFrameSize(),	// frameMax
+                        HeartbeatConfig.getInstance().getDelay());	// heartbeat
                     protocolSession.writeRequest(evt.getChannelId(), tune, stateManager);
                     break;
                 case CONTINUE:
                     stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
-                    // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-                    // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
                     // Be aware of possible changes to parameter order as versions change.
-                    AMQMethodBody challenge = ConnectionSecureBody.createMethodBody
-                        ((byte)0, (byte)9,	// AMQP version (major, minor)
-                         authResult.challenge);	// challenge
+                    AMQMethodBody challenge = ConnectionSecureBody.createMethodBody(
+                        protocolSession.getMajor(), // AMQP major version
+                        protocolSession.getMinor(), // AMQP minor version
+                        authResult.challenge);	// challenge
                     protocolSession.writeRequest(evt.getChannelId(), challenge, stateManager);
             }
         }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java Fri Jan 19 14:19:51 2007
@@ -42,8 +42,7 @@
         return _instance;
     }
 
-    public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
-                               ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+    public void methodReceived(AMQProtocolSession protocolSession,
                                AMQMethodEvent<ConnectionTuneOkBody> evt) throws AMQException
     {
         ConnectionTuneOkBody body = evt.getMethod();
@@ -51,7 +50,7 @@
         {
             _logger.debug(body);
         }
-        stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
+        protocolSession.getStateManager().changeState(AMQState.CONNECTION_NOT_OPENED);
         protocolSession.initHeartbeats(body.heartbeat);
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java Fri Jan 19 14:19:51 2007
@@ -60,14 +60,11 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
-                               ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+    public void methodReceived(AMQProtocolSession protocolSession,
                                AMQMethodEvent<ExchangeBoundBody> evt) throws AMQException
     {
-        // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-        byte major = (byte)0;
-        byte minor = (byte)9;
+        byte major = protocolSession.getMajor();
+        byte minor = protocolSession.getMinor();
         
         ExchangeBoundBody body = evt.getMethod();
 
@@ -78,7 +75,7 @@
         {
             throw new AMQException("Exchange exchange must not be null");
         }
-        Exchange exchange = exchangeRegistry.getExchange(exchangeName);
+        Exchange exchange = protocolSession.getExchangeRegistry().getExchange(exchangeName);
         AMQMethodBody response;
         if (exchange == null)
         {
@@ -111,7 +108,7 @@
             }
             else
             {
-                AMQQueue queue = queueRegistry.getQueue(queueName);
+                AMQQueue queue = protocolSession.getQueueRegistry().getQueue(queueName);
                 if (queue == null)
                 {
                     // AMQP version change:  Be aware of possible changes to parameter order as versions change.
@@ -143,7 +140,7 @@
         }
         else if (queueName != null)
         {
-            AMQQueue queue = queueRegistry.getQueue(queueName);
+            AMQQueue queue = protocolSession.getQueueRegistry().getQueue(queueName);
             if (queue == null)
             {
                 // AMQP version change:  Be aware of possible changes to parameter order as versions change.

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java Fri Jan 19 14:19:51 2007
@@ -53,8 +53,7 @@
         exchangeFactory = ApplicationRegistry.getInstance().getExchangeFactory();
     }
 
-    public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
-                               ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+    public void methodReceived(AMQProtocolSession protocolSession,
                                AMQMethodEvent<ExchangeDeclareBody> evt) throws AMQException
     {
         final ExchangeDeclareBody body = evt.getMethod();
@@ -62,6 +61,7 @@
         {
             _logger.debug("Request to declare exchange of type " + body.type + " with name " + body.exchange);
         }
+        ExchangeRegistry exchangeRegistry = protocolSession.getExchangeRegistry();
         synchronized(exchangeRegistry)
         {
             Exchange exchange = exchangeRegistry.getExchange(body.exchange);
@@ -75,10 +75,10 @@
         }
         if(!body.nowait)
         {
-            // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
             // Be aware of possible changes to parameter order as versions change.
-            AMQMethodBody response = ExchangeDeclareOkBody.createMethodBody((byte)0, (byte)9);
+            AMQMethodBody response = ExchangeDeclareOkBody.createMethodBody(
+                protocolSession.getMajor(), // AMQP major version
+                protocolSession.getMinor()); // AMQP minor version
             protocolSession.writeResponse(evt, response);
         }
     }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java Fri Jan 19 14:19:51 2007
@@ -45,18 +45,17 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
-                               ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+    public void methodReceived(AMQProtocolSession protocolSession,
                                AMQMethodEvent<ExchangeDeleteBody> evt) throws AMQException
     {
         ExchangeDeleteBody body = evt.getMethod();
         try
         {
-            exchangeRegistry.unregisterExchange(body.exchange, body.ifUnused);
-            // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            protocolSession.getExchangeRegistry().unregisterExchange(body.exchange, body.ifUnused);
             // Be aware of possible changes to parameter order as versions change.
-            protocolSession.writeResponse(evt, ExchangeDeleteOkBody.createMethodBody((byte)0, (byte)9));
+            protocolSession.writeResponse(evt, ExchangeDeleteOkBody.createMethodBody(
+                protocolSession.getMajor(), // AMQP major version
+                protocolSession.getMinor())); // AMQP minor version
         }
         catch (ExchangeInUseException e)
         {

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageAppendHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageAppendHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageAppendHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageAppendHandler.java Fri Jan 19 14:19:51 2007
@@ -41,10 +41,7 @@
     private MessageAppendHandler() {}
     
     
-    public void methodReceived (AMQStateManager stateManager,
-    							QueueRegistry queueRegistry,
-                              	ExchangeRegistry exchangeRegistry,
-                                AMQProtocolSession protocolSession,
+    public void methodReceived (AMQProtocolSession protocolSession,
                                	AMQMethodEvent<MessageAppendBody> evt)
                                 throws AMQException
     {

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java Fri Jan 19 14:19:51 2007
@@ -44,10 +44,7 @@
     private MessageCancelHandler() {}
     
     
-    public void methodReceived (AMQStateManager stateManager,
-    							QueueRegistry queueRegistry,
-                              	ExchangeRegistry exchangeRegistry,
-                                AMQProtocolSession protocolSession,
+    public void methodReceived (AMQProtocolSession protocolSession,
                                	AMQMethodEvent<MessageCancelBody> evt)
                                 throws AMQException
     {
@@ -55,10 +52,10 @@
         final MessageCancelBody body = evt.getMethod();
         channel.unsubscribeConsumer(protocolSession, body.destination);
         
-        // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.
-        final AMQMethodBody methodBody = MessageOkBody.createMethodBody((byte)0, (byte)9);
+        final AMQMethodBody methodBody = MessageOkBody.createMethodBody(
+            protocolSession.getMajor(), // AMQP major version
+            protocolSession.getMinor()); // AMQP minor version
         protocolSession.writeResponse(evt, methodBody);
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCheckpointHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCheckpointHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCheckpointHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCheckpointHandler.java Fri Jan 19 14:19:51 2007
@@ -41,10 +41,7 @@
     private MessageCheckpointHandler() {}
     
     
-    public void methodReceived (AMQStateManager stateManager,
-    							QueueRegistry queueRegistry,
-                              	ExchangeRegistry exchangeRegistry,
-                                AMQProtocolSession protocolSession,
+    public void methodReceived (AMQProtocolSession protocolSession,
                                	AMQMethodEvent<MessageCheckpointBody> evt)
                                 throws AMQException
     {

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCloseHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCloseHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCloseHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCloseHandler.java Fri Jan 19 14:19:51 2007
@@ -41,10 +41,7 @@
     private MessageCloseHandler() {}
     
     
-    public void methodReceived (AMQStateManager stateManager,
-    							QueueRegistry queueRegistry,
-                              	ExchangeRegistry exchangeRegistry,
-                                AMQProtocolSession protocolSession,
+    public void methodReceived (AMQProtocolSession protocolSession,
                                	AMQMethodEvent<MessageCloseBody> evt)
                                 throws AMQException
     {

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java Fri Jan 19 14:19:51 2007
@@ -53,10 +53,7 @@
     private MessageConsumeHandler() {}
 
 
-    public void methodReceived (AMQStateManager stateManager,
-                                QueueRegistry queueRegistry,
-                              	ExchangeRegistry exchangeRegistry,
-                                AMQProtocolSession session,
+    public void methodReceived (AMQProtocolSession session,
                                	AMQMethodEvent<MessageConsumeBody> evt)
                                 throws AMQException
     {
@@ -71,19 +68,21 @@
         }
         else
         {
-            AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : queueRegistry.getQueue(body.queue);
+            AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : session.getQueueRegistry().getQueue(body.queue);
 
             if (queue == null)
             {
                 _log.info("No queue for '" + body.queue + "'");
                 if(body.queue!=null)
                 {
-                    channelClose(session, channelId, stateManager,
-                                 "No such queue, '" + body.queue + "'", AMQConstant.NOT_FOUND);
+                    session.closeChannelRequest(evt.getChannelId(), AMQConstant.NOT_FOUND.getCode(),
+                        "No such queue, '" + body.queue + "'");
+//                     channelClose(session, channelId, stateManager,
+//                                  "No such queue, '" + body.queue + "'", AMQConstant.NOT_FOUND);
                 }
                 else
                 {
-                    connectionClose(session, channelId, stateManager,
+                    connectionClose(session, channelId, session.getStateManager(),
                                     "No queue name provided, no default queue defined.",
                                     AMQConstant.NOT_ALLOWED);
                 }
@@ -94,10 +93,10 @@
                 {
                     /*AMQShort*/String destination = channel.subscribeToQueue
                         (body.destination, queue, session, !body.noAck, /*XXX*/null, body.noLocal);
-                    // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-                    // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
                     // Be aware of possible changes to parameter order as versions change.
-                    session.writeResponse(evt, MessageOkBody.createMethodBody((byte)0, (byte)9));
+                    session.writeResponse(evt, MessageOkBody.createMethodBody(
+                        session.getMajor(), // AMQP major version
+                        session.getMinor())); // AMQP minor version
 
                     //now allow queue to start async processing of any backlog of messages
                     queue.deliverAsync();
@@ -105,11 +104,13 @@
                 catch (AMQInvalidSelectorException ise)
                 {
                     _log.info("Closing connection due to invalid selector");
-                    channelClose(session, channelId, stateManager, ise.getMessage(), AMQConstant.INVALID_SELECTOR);
+                    session.closeChannelRequest(evt.getChannelId(), AMQConstant.INVALID_SELECTOR.getCode(),
+                        ise.getMessage());
+//                    channelClose(session, channelId, stateManager, ise.getMessage(), AMQConstant.INVALID_SELECTOR);
                 }
                 catch (ConsumerTagNotUniqueException e)
                 {
-                    connectionClose(session, channelId, stateManager,
+                    connectionClose(session, channelId, session.getStateManager(),
                                     "Non-unique consumer tag, '" + body.destination + "'",
                                     AMQConstant.NOT_ALLOWED);
                 }
@@ -117,37 +118,37 @@
         }
     }
 
-    private void channelClose(AMQProtocolSession session, int channelId, AMQMethodListener listener,
-                              String message, AMQConstant code)
-        throws AMQException
-    {
-        /*AMQShort*/String msg = new /*AMQShort*/String(message);
-        // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-        // Be aware of possible changes to parameter order as versions change.
-        session.writeRequest(channelId, ChannelCloseBody.createMethodBody
-                             ((byte)0, (byte)9,	// AMQP version (major, minor)
-                              MessageConsumeBody.getClazz((byte)0, (byte)9),	// classId
-                              MessageConsumeBody.getMethod((byte)0, (byte)9),	// methodId
-                              code.getCode(),	// replyCode
-                              msg),	// replyText
-                             listener);
-    }
+//     private void channelClose(AMQProtocolSession session, int channelId, AMQMethodListener listener,
+//                               String message, AMQConstant code)
+//         throws AMQException
+//     {
+//         /*AMQShort*/String msg = new /*AMQShort*/String(message);
+//         // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
+//         // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+//         // Be aware of possible changes to parameter order as versions change.
+//         session.writeRequest(channelId, ChannelCloseBody.createMethodBody
+//                              ((byte)0, (byte)9,	// AMQP version (major, minor)
+//                               MessageConsumeBody.getClazz((byte)0, (byte)9),	// classId
+//                               MessageConsumeBody.getMethod((byte)0, (byte)9),	// methodId
+//                               code.getCode(),	// replyCode
+//                               msg),	// replyText
+//                              listener);
+//     }
 
     private void connectionClose(AMQProtocolSession session, int channelId, AMQMethodListener listener,
                                  String message, AMQConstant code)
         throws AMQException
     {
+        byte major = session.getMajor();
+        byte minor = session.getMinor();
         /*AMQShort*/String msg = new /*AMQShort*/String(message);
-        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.
-        session.writeRequest(channelId, ConnectionCloseBody.createMethodBody
-                             ((byte)0, (byte)9,	// AMQP version (major, minor)
-                              MessageConsumeBody.getClazz((byte)0, (byte)9),	// classId
-                              MessageConsumeBody.getMethod((byte)0, (byte)9),	// methodId
-                              code.getCode(),	// replyCode
-                              msg),	// replyText
+        session.writeRequest(channelId, ConnectionCloseBody.createMethodBody(
+                                major, minor,	// AMQP version (major, minor)
+                                MessageConsumeBody.getClazz(major, minor),	// classId
+                                MessageConsumeBody.getMethod(major, minor),	// methodId
+                                code.getCode(),	// replyCode
+                                msg),	// replyText
                              listener);
     }
 

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageEmptyHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageEmptyHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageEmptyHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageEmptyHandler.java Fri Jan 19 14:19:51 2007
@@ -41,10 +41,7 @@
     private MessageEmptyHandler() {}
     
     
-    public void methodReceived (AMQStateManager stateManager,
-    							QueueRegistry queueRegistry,
-                              	ExchangeRegistry exchangeRegistry,
-                                AMQProtocolSession protocolSession,
+    public void methodReceived (AMQProtocolSession protocolSession,
                                	AMQMethodEvent<MessageEmptyBody> evt)
                                 throws AMQException
     {

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java Fri Jan 19 14:19:51 2007
@@ -40,10 +40,7 @@
 
     private MessageGetHandler() {}
     
-    public void methodReceived (AMQStateManager stateManager,
-    							QueueRegistry queueRegistry,
-                              	ExchangeRegistry exchangeRegistry,
-                                AMQProtocolSession protocolSession,
+    public void methodReceived (AMQProtocolSession protocolSession,
                                	AMQMethodEvent<MessageGetBody> evt)
                                 throws AMQException
     {

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOffsetHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOffsetHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOffsetHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOffsetHandler.java Fri Jan 19 14:19:51 2007
@@ -41,10 +41,7 @@
     private MessageOffsetHandler() {}
     
     
-    public void methodReceived (AMQStateManager stateManager,
-    							QueueRegistry queueRegistry,
-                              	ExchangeRegistry exchangeRegistry,
-                                AMQProtocolSession protocolSession,
+    public void methodReceived (AMQProtocolSession protocolSession,
                                	AMQMethodEvent<MessageOffsetBody> evt)
                                 throws AMQException
     {

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOkHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOkHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOkHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOkHandler.java Fri Jan 19 14:19:51 2007
@@ -41,10 +41,7 @@
     private MessageOkHandler() {}
     
     
-    public void methodReceived (AMQStateManager stateManager,
-    							QueueRegistry queueRegistry,
-                              	ExchangeRegistry exchangeRegistry,
-                                AMQProtocolSession protocolSession,
+    public void methodReceived (AMQProtocolSession protocolSession,
                                	AMQMethodEvent<MessageOkBody> evt)
                                 throws AMQException
     {

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOpenHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOpenHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOpenHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOpenHandler.java Fri Jan 19 14:19:51 2007
@@ -41,10 +41,7 @@
     private MessageOpenHandler() {}
     
     
-    public void methodReceived (AMQStateManager stateManager,
-    							QueueRegistry queueRegistry,
-                              	ExchangeRegistry exchangeRegistry,
-                                AMQProtocolSession protocolSession,
+    public void methodReceived (AMQProtocolSession protocolSession,
                                	AMQMethodEvent<MessageOpenBody> evt)
                                 throws AMQException
     {

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java Fri Jan 19 14:19:51 2007
@@ -42,18 +42,15 @@
     private MessageQosHandler() {}
     
     
-    public void methodReceived (AMQStateManager stateManager,
-    							QueueRegistry queueRegistry,
-                              	ExchangeRegistry exchangeRegistry,
-                                AMQProtocolSession protocolSession,
+    public void methodReceived (AMQProtocolSession protocolSession,
                                	AMQMethodEvent<MessageQosBody> evt)
                                 throws AMQException
     {
         protocolSession.getChannel(evt.getChannelId()).setPrefetchCount(evt.getMethod().prefetchCount);
-        // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.
-        protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), new MessageOkBody((byte)0, (byte)9));
+        protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), new MessageOkBody(
+            protocolSession.getMajor(), // AMQP major version
+            protocolSession.getMinor())); // AMQP minor version
     }
 }
 

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java Fri Jan 19 14:19:51 2007
@@ -41,10 +41,7 @@
     private MessageRecoverHandler() {}
     
     
-    public void methodReceived (AMQStateManager stateManager,
-    							QueueRegistry queueRegistry,
-                              	ExchangeRegistry exchangeRegistry,
-                                AMQProtocolSession protocolSession,
+    public void methodReceived (AMQProtocolSession protocolSession,
                                	AMQMethodEvent<MessageRecoverBody> evt)
                                 throws AMQException
     {

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRejectHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRejectHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRejectHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRejectHandler.java Fri Jan 19 14:19:51 2007
@@ -41,10 +41,7 @@
     private MessageRejectHandler() {}
     
     
-    public void methodReceived (AMQStateManager stateManager,
-    							QueueRegistry queueRegistry,
-                              	ExchangeRegistry exchangeRegistry,
-                                AMQProtocolSession protocolSession,
+    public void methodReceived (AMQProtocolSession protocolSession,
                                	AMQMethodEvent<MessageRejectBody> evt)
                                 throws AMQException
     {

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageResumeHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageResumeHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageResumeHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageResumeHandler.java Fri Jan 19 14:19:51 2007
@@ -41,10 +41,7 @@
     private MessageResumeHandler() {}
     
     
-    public void methodReceived (AMQStateManager stateManager,
-    							QueueRegistry queueRegistry,
-                              	ExchangeRegistry exchangeRegistry,
-                                AMQProtocolSession protocolSession,
+    public void methodReceived (AMQProtocolSession protocolSession,
                                	AMQMethodEvent<MessageResumeBody> evt)
                                 throws AMQException
     {

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java Fri Jan 19 14:19:51 2007
@@ -51,10 +51,7 @@
 
     private MessageTransferHandler() {}
 
-    public void methodReceived (AMQStateManager stateManager,
-    							QueueRegistry queueRegistry,
-                              	ExchangeRegistry exchangeRegistry,
-                                AMQProtocolSession protocolSession,
+    public void methodReceived (AMQProtocolSession protocolSession,
                                	AMQMethodEvent<MessageTransferBody> evt)
                                 throws AMQException
     {
@@ -68,29 +65,32 @@
         if (body.destination == null) {
             body.destination = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
         }
-        Exchange e = exchangeRegistry.getExchange(body.destination);
+        Exchange e = protocolSession.getExchangeRegistry().getExchange(body.destination);
         // if the exchange does not exist we raise a channel exception
         if (e == null) {
-            protocolSession.closeChannel(evt.getChannelId());
-            // TODO: modify code gen to make getClazz and getMethod public methods rather than protected
-            // then we can remove the hardcoded 0,0
-            // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-            // Be aware of possible changes to parameter order as versions change.
-            AMQMethodBody cf = ChannelCloseBody.createMethodBody
-                ((byte)0, (byte)9,	// AMQP version (major, minor)
-                 MessageTransferBody.getClazz((byte)0, (byte)9),	// classId
-                 MessageTransferBody.getMethod((byte)0, (byte)9),	// methodId
-                 500,	// replyCode
-                 UNKNOWN_EXCHANGE_NAME);	// replyText
-            protocolSession.writeRequest(evt.getChannelId(), cf, stateManager);
+//             protocolSession.closeChannel(evt.getChannelId());
+//             // TODO: modify code gen to make getClazz and getMethod public methods rather than protected
+//             // then we can remove the hardcoded 0,0
+//             // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
+//             // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+//             // Be aware of possible changes to parameter order as versions change.
+//             AMQMethodBody cf = ChannelCloseBody.createMethodBody
+//                 ((byte)0, (byte)9,	// AMQP version (major, minor)
+//                  MessageTransferBody.getClazz((byte)0, (byte)9),	// classId
+//                  MessageTransferBody.getMethod((byte)0, (byte)9),	// methodId
+//                  500,	// replyCode
+//                  UNKNOWN_EXCHANGE_NAME);	// replyText
+//             protocolSession.writeRequest(evt.getChannelId(), cf, stateManager);
+            protocolSession.closeChannelRequest(evt.getChannelId(), 500, UNKNOWN_EXCHANGE_NAME);
         } else {
             // The partially populated BasicDeliver frame plus the received route body
             // is stored in the channel. Once the final body frame has been received
             // it is routed to the exchange.
             AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
             channel.addMessageTransfer(body, protocolSession);
-            protocolSession.writeResponse(evt, MessageOkBody.createMethodBody((byte)0, (byte)9));
+            protocolSession.writeResponse(evt, MessageOkBody.createMethodBody(
+                protocolSession.getMajor(), // AMQP major version
+                protocolSession.getMinor())); // AMQP minor version
         }
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java Fri Jan 19 14:19:51 2007
@@ -50,8 +50,7 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
-                               ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+    public void methodReceived(AMQProtocolSession protocolSession,
                                AMQMethodEvent<QueueBindBody> evt) throws AMQException
     {
         final QueueBindBody body = evt.getMethod();
@@ -70,14 +69,14 @@
         }
         else
         {
-            queue = queueRegistry.getQueue(body.queue);
+            queue = protocolSession.getQueueRegistry().getQueue(body.queue);
         }
 
         if (queue == null)
         {
             throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), "Queue " + body.queue + " does not exist.");
         }
-        final Exchange exch = exchangeRegistry.getExchange(body.exchange);
+        final Exchange exch = protocolSession.getExchangeRegistry().getExchange(body.exchange);
         if (exch == null)
         {
             throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), "Exchange " + body.exchange + " does not exist.");
@@ -90,10 +89,10 @@
         }
         if (!body.nowait)
         {
-            // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
             // Be aware of possible changes to parameter order as versions change.
-            final AMQMethodBody response = QueueBindOkBody.createMethodBody((byte)0, (byte)9);
+            final AMQMethodBody response = QueueBindOkBody.createMethodBody(
+                protocolSession.getMajor(), // AMQP major version
+                protocolSession.getMinor()); // AMQP minor version
             protocolSession.writeResponse(evt, response);
         }
     }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Fri Jan 19 14:19:51 2007
@@ -65,8 +65,7 @@
         _store = ApplicationRegistry.getInstance().getMessageStore();
     }
 
-    public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
-                               ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+    public void methodReceived(AMQProtocolSession protocolSession,
                                AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
     {
         QueueDeclareBody body = evt.getMethod();
@@ -78,6 +77,7 @@
         }
         //TODO: do we need to check that the queue already exists with exactly the same "configuration"?
 
+        QueueRegistry queueRegistry = protocolSession.getQueueRegistry();
         synchronized (queueRegistry)
         {
             AMQQueue queue;
@@ -91,7 +91,7 @@
                 queueRegistry.registerQueue(queue);
                 if (autoRegister)
                 {
-                    Exchange defaultExchange = exchangeRegistry.getExchange("amq.direct");
+                    Exchange defaultExchange = protocolSession.getExchangeRegistry().getExchange("amq.direct");
                     defaultExchange.registerQueue(body.queue, queue, null);
                     queue.bind(body.queue, defaultExchange);
                     _log.info("Queue " + body.queue + " bound to default exchange");
@@ -102,14 +102,13 @@
         }
         if (!body.nowait)
         {
-            // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
             // Be aware of possible changes to parameter order as versions change.
-            AMQMethodBody response = QueueDeclareOkBody.createMethodBody
-                ((byte)0, (byte)9,	// AMQP version (major, minor)
-                 0L, // consumerCount
-                 0L, // messageCount
-                 body.queue); // queue
+            AMQMethodBody response = QueueDeclareOkBody.createMethodBody(
+                protocolSession.getMajor(), // AMQP major version
+                protocolSession.getMinor(), // AMQP minor version
+                0L, // consumerCount
+                0L, // messageCount
+                body.queue); // queue
             _log.info("Queue " + body.queue + " declared successfully");
             protocolSession.writeResponse(evt, response);
         }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java Fri Jan 19 14:19:51 2007
@@ -57,7 +57,7 @@
 
     }
 
-    public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
+    public void methodReceived(AMQProtocolSession session, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
     {
         QueueDeleteBody body = evt.getMethod();
         AMQQueue queue;
@@ -67,7 +67,7 @@
         }
         else
         {
-            queue = queues.getQueue(body.queue);
+            queue = session.getQueueRegistry().getQueue(body.queue);
         }
 
         if(queue == null)
@@ -81,12 +81,11 @@
         {
             int purged = queue.delete(body.ifUnused, body.ifEmpty);
             _store.removeQueue(queue.getName());
-            // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
             // Be aware of possible changes to parameter order as versions change.
-            session.writeResponse(evt, QueueDeleteOkBody.createMethodBody
-                                  ((byte)0, (byte)9,	// AMQP version (major, minor)
-                                   purged));	// messageCount
+            session.writeResponse(evt, QueueDeleteOkBody.createMethodBody(
+                session.getMajor(), // AMQP major version
+                session.getMinor(), // AMQP minor version
+                purged));	// messageCount
         }
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java Fri Jan 19 14:19:51 2007
@@ -44,18 +44,17 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
-                               ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+    public void methodReceived(AMQProtocolSession protocolSession,
                                AMQMethodEvent<TxCommitBody> evt) throws AMQException
     {
 
         try{
             AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
             channel.commit();
-            // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
             // Be aware of possible changes to parameter order as versions change.
-            protocolSession.writeResponse(evt, TxCommitOkBody.createMethodBody((byte)0, (byte)9));
+            protocolSession.writeResponse(evt, TxCommitOkBody.createMethodBody(
+                protocolSession.getMajor(), // AMQP major version
+                protocolSession.getMinor())); // AMQP minor version
             channel.processReturns(protocolSession);
         }catch(AMQException e){
             throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage());

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java Fri Jan 19 14:19:51 2007
@@ -44,17 +44,16 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
-                               ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+    public void methodReceived(AMQProtocolSession protocolSession,
                                AMQMethodEvent<TxRollbackBody> evt) throws AMQException
     {
         try{
             AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
             channel.rollback();
-            // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
             // Be aware of possible changes to parameter order as versions change.
-            protocolSession.writeResponse(evt, TxRollbackOkBody.createMethodBody((byte)0, (byte)9));
+            protocolSession.writeResponse(evt, TxRollbackOkBody.createMethodBody(
+                protocolSession.getMajor(), // AMQP major version
+                protocolSession.getMinor())); // AMQP minor version
             //Now resend all the unacknowledged messages back to the original subscribers.
             //(Must be done after the TxnRollback-ok response).
             channel.resend(protocolSession);

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java Fri Jan 19 14:19:51 2007
@@ -43,14 +43,13 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
-                               ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+    public void methodReceived(AMQProtocolSession protocolSession,
                                AMQMethodEvent<TxSelectBody> evt) throws AMQException
     {
         protocolSession.getChannel(evt.getChannelId()).setTransactional(true);
-        // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.
-        protocolSession.writeResponse(evt, TxSelectOkBody.createMethodBody((byte)0, (byte)9));
+        protocolSession.writeResponse(evt, TxSelectOkBody.createMethodBody(
+                protocolSession.getMajor(), // AMQP major version
+                protocolSession.getMinor())); // AMQP minor version
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Fri Jan 19 14:19:51 2007
@@ -29,6 +29,8 @@
 import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.framing.ProtocolInitiation;
 import org.apache.qpid.framing.ConnectionStartBody;
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.ChannelCloseOkBody;
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.Content;
 import org.apache.qpid.framing.FieldTable;
@@ -40,6 +42,7 @@
 import org.apache.qpid.framing.RequestManager;
 import org.apache.qpid.framing.ResponseManager;
 import org.apache.qpid.framing.RequestResponseMappingException;
+import org.apache.qpid.framing.MessageTransferBody;
 import org.apache.qpid.codec.AMQCodecFactory;
 import org.apache.qpid.codec.AMQDecoder;
 import org.apache.qpid.protocol.AMQMethodEvent;
@@ -270,6 +273,15 @@
         return requestManager.sendRequest(methodBody, methodListener);
     }
 
+    // This version uses this session's instance of AMQStateManager as the listener
+    public long writeRequest(int channelNum, AMQMethodBody methodBody)
+        throws AMQException
+    {
+        AMQChannel channel = getChannel(channelNum);
+        RequestManager requestManager = channel.getRequestManager();
+        return requestManager.sendRequest(methodBody, _stateManager);
+    }
+
     public void writeResponse(int channelNum, long requestId, AMQMethodBody methodBody)
         throws AMQException
     {
@@ -371,7 +383,8 @@
      * @throws AMQException if an error occurs closing the channel
      * @throws IllegalArgumentException if the channel id is not valid
      */
-    public void closeChannel(int channelId) throws AMQException
+    // Used to close a channel as a response to a client close request
+    public void closeChannelResponse(int channelId, long requestId) throws AMQException
     {
         final AMQChannel channel = _channelMap.get(channelId);
         if (channel == null)
@@ -383,6 +396,8 @@
             try
             {
                 channel.close(this);
+                // Be aware of possible changes to parameter order as versions change.
+                writeResponse(channelId, requestId, ChannelCloseOkBody.createMethodBody(_major, _minor));
             }
             finally
             {
@@ -390,6 +405,28 @@
             }
         }
     }
+    
+    // Used to close a channel from the server side and inform the client
+    public void closeChannelRequest(int channelId, int replyCode, String replyText) throws AMQException
+    {
+        final AMQChannel channel = _channelMap.get(channelId);
+        if (channel == null)
+        {
+            throw new IllegalArgumentException("Unknown channel id");
+        }
+        else
+        {
+            channel.close(this);
+            // Be aware of possible changes to parameter order as versions change.
+            AMQMethodBody cf = ChannelCloseBody.createMethodBody
+                (_major, _minor,	// AMQP version (major, minor)
+                MessageTransferBody.getClazz((byte)0, (byte)9),	// classId
+                MessageTransferBody.getMethod((byte)0, (byte)9),	// methodId
+                replyCode,	// replyCode
+                replyText);	// replyText
+            writeRequest(channelId, cf);
+        }
+    }
 
     /**
      * In our current implementation this is used by the clustering code.
@@ -510,6 +547,16 @@
         _clientProperties = clientProperties;
     }
     
+    public QueueRegistry getQueueRegistry()
+    {
+        return _queueRegistry;
+    }
+    
+    public ExchangeRegistry getExchangeRegistry()
+    {
+        return _exchangeRegistry;
+    }
+    
     public AMQStateManager getStateManager()
     {
         return _stateManager;
@@ -520,12 +567,12 @@
      * NOTE: Both major and minor will be set to 0 prior to protocol initiation.
      */
 
-    public byte getAmqpMajor()
+    public byte getMajor()
     {
         return _major;
     }
 
-    public byte getAmqpMinor()
+    public byte getMinor()
     {
         return _minor;
     }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Fri Jan 19 14:19:51 2007
@@ -25,6 +25,9 @@
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
 
 import javax.security.sasl.SaslServer;
 
@@ -73,10 +76,13 @@
      * <ul><li>any queue subscriptions (this may in turn remove queues if they are auto delete</li>
      * </ul>
      * @param channelId id of the channel to close
+     * @param requestId id of the request that initiated the close, used in response
      * @throws org.apache.qpid.AMQException if an error occurs closing the channel
      * @throws IllegalArgumentException if the channel id is not valid
      */
-    void closeChannel(int channelId) throws AMQException;
+    void closeChannelResponse(int channelId, long requestId) throws AMQException;
+    
+    void closeChannelRequest(int channelId, int replyCode, String replyText) throws AMQException;
 
     /**
      * Remove a channel from the session but do not close it.
@@ -124,4 +130,11 @@
     FieldTable getClientProperties();
 
     void setClientProperties(FieldTable clientProperties);
+    
+    QueueRegistry getQueueRegistry();
+    ExchangeRegistry getExchangeRegistry();
+    AMQStateManager getStateManager();
+    byte getMajor();
+    byte getMinor();
+    boolean amqpVersionEquals(byte major, byte minor);
 }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java Fri Jan 19 14:19:51 2007
@@ -178,7 +178,7 @@
         StateAwareMethodListener<B> handler = findStateTransitionHandler(_currentState, evt.getMethod());
         if (handler != null)
         {
-            handler.methodReceived(this, _queueRegistry, _exchangeRegistry, _protocolSession, evt);
+            handler.methodReceived(_protocolSession, evt);
             return true;
         }
         return false;

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java?view=diff&rev=497974&r1=497973&r2=497974
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java Fri Jan 19 14:19:51 2007
@@ -34,7 +34,7 @@
  */
 public interface StateAwareMethodListener <B extends AMQMethodBody>
 {
-    void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
-                        ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
-                        AMQMethodEvent<B> evt) throws AMQException;
+    void methodReceived(AMQProtocolSession protocolSession, 
+        AMQMethodEvent<B> evt)
+        throws AMQException;
 }