You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/11/24 22:14:23 UTC

svn commit: r597918 [8/12] - in /incubator/qpid/branches/M2.1: gentools/ gentools/lib/ gentools/src/org/apache/qpid/gentools/ java/ java/broker/src/main/java/org/apache/qpid/server/ java/broker/src/main/java/org/apache/qpid/server/handler/ java/broker/...

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java Sat Nov 24 13:14:14 2007
@@ -25,6 +25,7 @@
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.ChannelCloseBody;
 import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -47,16 +48,16 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ChannelCloseBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, ChannelCloseBody body, int channelId) throws AMQException
     {
         AMQProtocolSession session = stateManager.getProtocolSession();
-        ChannelCloseBody body = evt.getMethod();
+
         if (_logger.isInfoEnabled())
         {
-            _logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId +
-                         " and method " + body.methodId);
+            _logger.info("Received channel close for id " + channelId + " citing class " + body.getClassId() +
+                         " and method " + body.getMethodId());
         }
-        int channelId = evt.getChannelId();
+
 
         AMQChannel channel = session.getChannel(channelId);
 
@@ -69,10 +70,8 @@
         // Client requested closure so we don't wait for ok we send it
         stateManager.getProtocolSession().closeChannelOk(channelId);
 
-        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-        // Be aware of possible changes to parameter order as versions change.
-        AMQFrame response = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0);
-        session.writeFrame(response);
+        MethodRegistry methodRegistry = session.getMethodRegistry();
+        ChannelCloseOkBody responseBody = methodRegistry.createChannelCloseOkBody();
+        session.writeFrame(responseBody.generateFrame(channelId));
     }
 }

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java Sat Nov 24 13:14:14 2007
@@ -42,9 +42,9 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ChannelCloseOkBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, ChannelCloseOkBody body, int channelId) throws AMQException
     {
-        int channelId = evt.getChannelId();
+
         _logger.info("Received channel-close-ok for channel-id " + channelId);
         
         // Let the Protocol Session know the channel is now closed.

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java Sat Nov 24 13:14:14 2007
@@ -22,9 +22,7 @@
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.ChannelFlowBody;
-import org.apache.qpid.framing.ChannelFlowOkBody;
+import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -46,27 +44,23 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ChannelFlowBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, ChannelFlowBody body, int channelId) throws AMQException
     {
         AMQProtocolSession session = stateManager.getProtocolSession();
-        ChannelFlowBody body = evt.getMethod();
 
-        AMQChannel channel = session.getChannel(evt.getChannelId());
+
+        AMQChannel channel = session.getChannel(channelId);
 
         if (channel == null)
         {
-            throw body.getChannelNotFoundException(evt.getChannelId());
+            throw body.getChannelNotFoundException(channelId);
         }
 
-        channel.setSuspended(!body.active);
-        _logger.debug("Channel.Flow for channel " + evt.getChannelId() + ", active=" + body.active);
+        channel.setSuspended(!body.getActive());
+        _logger.debug("Channel.Flow for channel " + channelId + ", active=" + body.getActive());
 
-        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-        // Be aware of possible changes to parameter order as versions change.
-        AMQFrame response = ChannelFlowOkBody.createAMQFrame(evt.getChannelId(),
-            (byte)8, (byte)0,	// AMQP version (major, minor)
-            body.active);	// active
-        session.writeFrame(response);
+        MethodRegistry methodRegistry = session.getMethodRegistry();
+        AMQMethodBody responseBody = methodRegistry.createChannelFlowOkBody(body.getActive());
+        session.writeFrame(responseBody.generateFrame(channelId));
     }
 }

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java Sat Nov 24 13:14:14 2007
@@ -20,11 +20,17 @@
  */
 package org.apache.qpid.server.handler;
 
+import java.util.UUID;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.ChannelOpenBody;
-import org.apache.qpid.framing.ChannelOpenOkBody;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
+import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
 import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.state.AMQStateManager;
@@ -44,18 +50,55 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ChannelOpenBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, ChannelOpenBody body, int channelId) throws AMQException
     {
         AMQProtocolSession session = stateManager.getProtocolSession();
         VirtualHost virtualHost = session.getVirtualHost();
 
-        final AMQChannel channel = new AMQChannel(session,evt.getChannelId(), virtualHost.getMessageStore(),
+        final AMQChannel channel = new AMQChannel(session,channelId, virtualHost.getMessageStore(),
                                                   virtualHost.getExchangeRegistry());
         session.addChannel(channel);
-        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-        // Be aware of possible changes to parameter order as versions change.
-        AMQFrame response = ChannelOpenOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
-        session.writeFrame(response);
+
+        ChannelOpenOkBody response;
+
+        ProtocolVersion pv = session.getProtocolVersion();
+
+        if(pv.equals(ProtocolVersion.v8_0))
+        {
+            MethodRegistry_8_0 methodRegistry = (MethodRegistry_8_0) MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
+            response = methodRegistry.createChannelOpenOkBody();
+
+        }
+        else if(pv.equals(ProtocolVersion.v0_9))
+        {
+            MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9) MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+            UUID uuid = UUID.randomUUID();
+            ByteArrayOutputStream output = new ByteArrayOutputStream();
+            DataOutputStream dataOut = new DataOutputStream(output);
+            try
+            {
+                dataOut.writeLong(uuid.getMostSignificantBits());
+                dataOut.writeLong(uuid.getLeastSignificantBits());
+                dataOut.flush();
+                dataOut.close();
+            }
+            catch (IOException e)
+            {
+                // This *really* shouldn't happen as we're not doing any I/O
+                throw new RuntimeException("I/O exception when writing to byte array", e);
+            }
+
+            // should really associate this channelId to the session
+            byte[] channelName = output.toByteArray();
+            
+            response = methodRegistry.createChannelOpenOkBody(channelName);
+        }
+        else
+        {
+            throw new AMQException(AMQConstant.INTERNAL_ERROR, "Got channel open for protocol version not catered for: " + pv, null);
+        }
+
+
+        session.writeFrame(response.generateFrame(channelId));
     }
 }

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java Sat Nov 24 13:14:14 2007
@@ -25,6 +25,7 @@
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.ConnectionCloseBody;
 import org.apache.qpid.framing.ConnectionCloseOkBody;
+import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.state.AMQStateManager;
@@ -45,14 +46,14 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionCloseBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, ConnectionCloseBody body, int channelId) throws AMQException
     {
         AMQProtocolSession session = stateManager.getProtocolSession();
-        final ConnectionCloseBody body = evt.getMethod();
+
         if (_logger.isInfoEnabled())
         {
-            _logger.info("ConnectionClose received with reply code/reply text " + body.replyCode + "/" +
-                         body.replyText + " for " + session);
+            _logger.info("ConnectionClose received with reply code/reply text " + body.getReplyCode() + "/" +
+                         body.getReplyText() + " for " + session);
         }
         try
         {
@@ -62,10 +63,10 @@
         {
             _logger.error("Error closing protocol session: " + e, e);
         }
-        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-        // Be aware of possible changes to parameter order as versions change.
-        final AMQFrame response = ConnectionCloseOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0);
-        session.writeFrame(response);
+
+        MethodRegistry methodRegistry = session.getMethodRegistry();
+        ConnectionCloseOkBody responseBody = methodRegistry.createConnectionCloseOkBody();
+        session.writeFrame(responseBody.generateFrame(channelId));
+
     }
 }

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java Sat Nov 24 13:14:14 2007
@@ -44,7 +44,7 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionCloseOkBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, ConnectionCloseOkBody body, int channelId) throws AMQException
     {
         AMQProtocolSession session = stateManager.getProtocolSession();
         //todo should this not do more than just log the method?

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java Sat Nov 24 13:14:14 2007
@@ -21,10 +21,7 @@
 package org.apache.qpid.server.handler;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ConnectionOpenBody;
-import org.apache.qpid.framing.ConnectionOpenOkBody;
+import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -56,20 +53,20 @@
         return new AMQShortString(Long.toString(System.currentTimeMillis()));
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionOpenBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, ConnectionOpenBody body, int channelId) throws AMQException
     {
         AMQProtocolSession session = stateManager.getProtocolSession();
-        ConnectionOpenBody body = evt.getMethod();
+
 
         //ignore leading '/'
         String virtualHostName;
-        if ((body.virtualHost != null) && body.virtualHost.charAt(0) == '/')
+        if ((body.getVirtualHost() != null) && body.getVirtualHost().charAt(0) == '/')
         {
-            virtualHostName = new StringBuilder(body.virtualHost.subSequence(1, body.virtualHost.length())).toString();
+            virtualHostName = new StringBuilder(body.getVirtualHost().subSequence(1, body.getVirtualHost().length())).toString();
         }
         else
         {
-            virtualHostName = body.virtualHost == null ? null : String.valueOf(body.virtualHost);
+            virtualHostName = body.getVirtualHost() == null ? null : String.valueOf(body.getVirtualHost());
         }
 
         VirtualHost virtualHost = stateManager.getVirtualHostRegistry().getVirtualHost(virtualHostName);
@@ -105,14 +102,14 @@
                 session.setContextKey(generateClientID());
             }
 
-            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-            // Be aware of possible changes to parameter order as versions change.
-            AMQFrame response = ConnectionOpenOkBody.createAMQFrame((short) 0,
-                                                                    (byte) 8, (byte) 0,    // AMQP version (major, minor)
-                                                                    body.virtualHost);
+            MethodRegistry methodRegistry = session.getMethodRegistry();
+            AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(body.getVirtualHost());
+
             stateManager.changeState(AMQState.CONNECTION_OPEN);
-            session.writeFrame(response);
+
+            session.writeFrame(responseBody.generateFrame(channelId));
+
+            
         }
     }
 }

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java Sat Nov 24 13:14:14 2007
@@ -25,11 +25,7 @@
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.ConnectionSecureBody;
-import org.apache.qpid.framing.ConnectionSecureOkBody;
-import org.apache.qpid.framing.ConnectionTuneBody;
+import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -57,10 +53,10 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionSecureOkBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, ConnectionSecureOkBody body, int channelId) throws AMQException
     {
         AMQProtocolSession session = stateManager.getProtocolSession();
-        ConnectionSecureOkBody body = evt.getMethod();
+
 
         //fixme Vhost not defined yet
         //session.getVirtualHost().getAuthenticationManager();
@@ -71,8 +67,8 @@
         {
             throw new AMQException("No SASL context set up in session");
         }
-
-        AuthenticationResult authResult = authMgr.authenticate(ss, body.response);
+        MethodRegistry methodRegistry = session.getMethodRegistry();
+        AuthenticationResult authResult = authMgr.authenticate(ss, body.getResponse());
         switch (authResult.status)
         {
             case ERROR:
@@ -80,45 +76,34 @@
                 // throw new AMQException(AMQConstant.NOT_ALLOWED.getCode(), AMQConstant.NOT_ALLOWED.getName());
                 _logger.info("Authentication failed");
                 stateManager.changeState(AMQState.CONNECTION_CLOSING);
-                // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-                // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-                // Be aware of possible changes to parameter order as versions change.
-                AMQFrame close = ConnectionCloseBody.createAMQFrame(0,
-                    (byte)8, (byte)0,	// AMQP version (major, minor)
-                    ConnectionCloseBody.getClazz((byte)8, (byte)0),		// classId
-                    ConnectionCloseBody.getMethod((byte)8, (byte)0),	// methodId
-                    AMQConstant.NOT_ALLOWED.getCode(),	// replyCode
-                    AMQConstant.NOT_ALLOWED.getName());	// replyText
-                session.writeFrame(close);
+
+
+                ConnectionCloseBody connectionCloseBody =
+                        methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(),
+                                                                 AMQConstant.NOT_ALLOWED.getName(),
+                                                                 body.getClazz(),
+                                                                 body.getMethod());
+
+                session.writeFrame(connectionCloseBody.generateFrame(0) );
                 disposeSaslServer(session);
                 break;
             case SUCCESS:
                 _logger.info("Connected as: " + ss.getAuthorizationID());
                 stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
-                // TODO: Check the value of channelMax here: This should be the max
-                // value of a 2-byte unsigned integer (as channel is only 2 bytes on the wire),
-                // not Integer.MAX_VALUE (which is signed 4 bytes).
-                // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-                // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-                // Be aware of possible changes to parameter order as versions change.
-                AMQFrame tune = ConnectionTuneBody.createAMQFrame(0,
-                    (byte)8, (byte)0,	// AMQP version (major, minor)
-                    Integer.MAX_VALUE,	// channelMax
-                    ConnectionStartOkMethodHandler.getConfiguredFrameSize(),	// frameMax
-                    HeartbeatConfig.getInstance().getDelay());	// heartbeat
-                session.writeFrame(tune);
+
+                ConnectionTuneBody tuneBody =
+                        methodRegistry.createConnectionTuneBody(0xFFFF,
+                                                                ConnectionStartOkMethodHandler.getConfiguredFrameSize(),
+                                                                HeartbeatConfig.getInstance().getDelay());
+                session.writeFrame(tuneBody.generateFrame(0));
                 session.setAuthorizedID(new UsernamePrincipal(ss.getAuthorizationID()));                
                 disposeSaslServer(session);
                 break;
             case CONTINUE:
                 stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
-                // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-                // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-                // Be aware of possible changes to parameter order as versions change.
-                AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0,
-                    (byte)8, (byte)0,	// AMQP version (major, minor)
-                    authResult.challenge);	// challenge
-                session.writeFrame(challenge);
+
+                ConnectionSecureBody secureBody = methodRegistry.createConnectionSecureBody(authResult.challenge);
+                session.writeFrame(secureBody.generateFrame(0));
         }
     }
 

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java Sat Nov 24 13:14:14 2007
@@ -26,11 +26,7 @@
 import org.apache.commons.configuration.Configuration;
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.ConnectionSecureBody;
-import org.apache.qpid.framing.ConnectionStartOkBody;
-import org.apache.qpid.framing.ConnectionTuneBody;
-import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -53,7 +49,7 @@
 
     private static final int DEFAULT_FRAME_SIZE = 65536;
 
-    public static StateAwareMethodListener<ConnectionStartOkBody> getInstance()
+    public static ConnectionStartOkMethodHandler getInstance()
     {
         return _instance;
     }
@@ -62,51 +58,51 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionStartOkBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, ConnectionStartOkBody body, int channelId) throws AMQException
     {
         AMQProtocolSession session = stateManager.getProtocolSession();
-        final ConnectionStartOkBody body = evt.getMethod();
-        _logger.info("SASL Mechanism selected: " + body.mechanism);
-        _logger.info("Locale selected: " + body.locale);
+
+        _logger.info("SASL Mechanism selected: " + body.getMechanism());
+        _logger.info("Locale selected: " + body.getLocale());
 
         AuthenticationManager authMgr = ApplicationRegistry.getInstance().getAuthenticationManager();//session.getVirtualHost().getAuthenticationManager();
 
         SaslServer ss = null;
         try
         {                       
-            ss = authMgr.createSaslServer(String.valueOf(body.mechanism), session.getLocalFQDN());
+            ss = authMgr.createSaslServer(String.valueOf(body.getMechanism()), session.getLocalFQDN());
 
             if (ss == null)
             {
-                throw body.getConnectionException(AMQConstant.RESOURCE_ERROR, "Unable to create SASL Server:" + body.mechanism
+                throw body.getConnectionException(AMQConstant.RESOURCE_ERROR, "Unable to create SASL Server:" + body.getMechanism()
                 );
             }
 
             session.setSaslServer(ss);
 
-            AuthenticationResult authResult = authMgr.authenticate(ss, body.response);
+            AuthenticationResult authResult = authMgr.authenticate(ss, body.getResponse());
 
             //save clientProperties
             if (session.getClientProperties() == null)
             {
-                session.setClientProperties(body.clientProperties);
+                session.setClientProperties(body.getClientProperties());
             }
 
+            MethodRegistry methodRegistry = session.getMethodRegistry();
+
             switch (authResult.status)
             {
                 case ERROR:
                     _logger.info("Authentication failed");
                     stateManager.changeState(AMQState.CONNECTION_CLOSING);
-                    // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-                    // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-                    // Be aware of possible changes to parameter order as versions change.
-                    AMQFrame close = ConnectionCloseBody.createAMQFrame(0,
-                                                                        (byte) 8, (byte) 0,    // AMQP version (major, minor)
-                                                                        ConnectionCloseBody.getClazz((byte) 8, (byte) 0),        // classId
-                                                                        ConnectionCloseBody.getMethod((byte) 8, (byte) 0),    // methodId
-                                                                        AMQConstant.NOT_ALLOWED.getCode(),    // replyCode
-                                                                        AMQConstant.NOT_ALLOWED.getName());    // replyText
-                    session.writeFrame(close);
+
+                    ConnectionCloseBody closeBody =
+                            methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(),    // replyCode
+                                                                     AMQConstant.NOT_ALLOWED.getName(),
+                                                                     body.getClazz(),
+                                                                     body.getMethod());
+
+                    session.writeFrame(closeBody.generateFrame(0));
                     disposeSaslServer(session);
                     break;
 
@@ -115,25 +111,17 @@
                     session.setAuthorizedID(new UsernamePrincipal(ss.getAuthorizationID()));
 
                     stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
-                    // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-                    // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-                    // Be aware of possible changes to parameter order as versions change.
-                    AMQFrame tune = ConnectionTuneBody.createAMQFrame(0,
-                                                                      (byte) 8, (byte) 0,    // AMQP version (major, minor)
-                                                                      Integer.MAX_VALUE,    // channelMax
-                                                                      getConfiguredFrameSize(),    // frameMax
-                                                                      HeartbeatConfig.getInstance().getDelay());    // heartbeat
-                    session.writeFrame(tune);
+
+                    ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody(0xFFFF,
+                                                                                          getConfiguredFrameSize(),
+                                                                                          HeartbeatConfig.getInstance().getDelay());
+                    session.writeFrame(tuneBody.generateFrame(0));
                     break;
                 case CONTINUE:
                     stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
-                    // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-                    // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-                    // Be aware of possible changes to parameter order as versions change.
-                    AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0,
-                                                                             (byte) 8, (byte) 0,    // AMQP version (major, minor)
-                                                                             authResult.challenge);    // challenge
-                    session.writeFrame(challenge);
+
+                    ConnectionSecureBody secureBody = methodRegistry.createConnectionSecureBody(authResult.challenge);
+                    session.writeFrame(secureBody.generateFrame(0));
             }
         }
         catch (SaslException e)

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java Sat Nov 24 13:14:14 2007
@@ -40,15 +40,15 @@
         return _instance;
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionTuneOkBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, ConnectionTuneOkBody body, int channelId) throws AMQException
     {
         AMQProtocolSession session = stateManager.getProtocolSession();
-        ConnectionTuneOkBody body = evt.getMethod();
+
         if (_logger.isDebugEnabled())
         {
             _logger.debug(body);
         }
         stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
-        session.initHeartbeats(body.heartbeat);
+        session.initHeartbeats(body.getHeartbeat());
     }
 }

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java Sat Nov 24 13:14:14 2007
@@ -21,10 +21,8 @@
 package org.apache.qpid.server.handler;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ExchangeBoundBody;
-import org.apache.qpid.framing.ExchangeBoundOkBody;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -36,6 +34,8 @@
 
 /**
  * @author Apache Software Foundation
+ *
+ *
  */
 public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBoundBody>
 {
@@ -64,35 +64,32 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ExchangeBoundBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, ExchangeBoundBody body, int channelId) throws AMQException
     {
         AMQProtocolSession session = stateManager.getProtocolSession();
+        
         VirtualHost virtualHost = session.getVirtualHost();
         QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+        MethodRegistry methodRegistry = session.getMethodRegistry();
 
-        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-        byte major = (byte)8;
-        byte minor = (byte)0;
         
-        ExchangeBoundBody body = evt.getMethod();
 
-        AMQShortString exchangeName = body.exchange;
-        AMQShortString queueName = body.queue;
-        AMQShortString routingKey = body.routingKey;
+
+        AMQShortString exchangeName = body.getExchange();
+        AMQShortString queueName = body.getQueue();
+        AMQShortString routingKey = body.getRoutingKey();
         if (exchangeName == null)
         {
             throw new AMQException("Exchange exchange must not be null");
         }
         Exchange exchange = virtualHost.getExchangeRegistry().getExchange(exchangeName);
-        AMQFrame response;
+        ExchangeBoundOkBody response;
         if (exchange == null)
         {
-            // AMQP version change:  Be aware of possible changes to parameter order as versions change.
-            response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
-                major, minor,	// AMQP version (major, minor)
-                EXCHANGE_NOT_FOUND,	// replyCode
-                new AMQShortString("Exchange " + exchangeName + " not found"));	// replyText
+
+
+            response = methodRegistry.createExchangeBoundOkBody(EXCHANGE_NOT_FOUND,
+                                                                new AMQShortString("Exchange " + exchangeName + " not found"));
         }
         else if (routingKey == null)
         {
@@ -100,18 +97,12 @@
             {
                 if (exchange.hasBindings())
                 {
-                    // AMQP version change:  Be aware of possible changes to parameter order as versions change.
-                    response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
-                        major, minor,	// AMQP version (major, minor)
-                        OK,	// replyCode
-                        null);	// replyText
+                    response = methodRegistry.createExchangeBoundOkBody(OK, null);
                 }
                 else
                 {
-                    // AMQP version change:  Be aware of possible changes to parameter order as versions change.
-                    response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
-                        major, minor,	// AMQP version (major, minor)
-                        NO_BINDINGS,	// replyCode
+
+                    response = methodRegistry.createExchangeBoundOkBody(NO_BINDINGS,	// replyCode
                         null);	// replyText
                 }
             }
@@ -121,28 +112,22 @@
                 AMQQueue queue = queueRegistry.getQueue(queueName);
                 if (queue == null)
                 {
-                    // AMQP version change:  Be aware of possible changes to parameter order as versions change.
-                    response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
-                        major, minor,	// AMQP version (major, minor)
-                        QUEUE_NOT_FOUND,	// replyCode
+
+                    response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND,	// replyCode
                         new AMQShortString("Queue " + queueName + " not found"));	// replyText
                 }
                 else
                 {
                     if (exchange.isBound(queue))
                     {
-                        // AMQP version change:  Be aware of possible changes to parameter order as versions change.
-                        response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
-                            major, minor,	// AMQP version (major, minor)
-                            OK,	// replyCode
+
+                        response = methodRegistry.createExchangeBoundOkBody(OK,	// replyCode
                             null);	// replyText
                     }
                     else
                     {
-                        // AMQP version change:  Be aware of possible changes to parameter order as versions change.
-                        response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
-                            major, minor,	// AMQP version (major, minor)
-                            QUEUE_NOT_BOUND,	// replyCode
+
+                        response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_BOUND,	// replyCode
                             new AMQShortString("Queue " + queueName + " not bound to exchange " + exchangeName));	// replyText
                     }
                 }
@@ -153,53 +138,43 @@
             AMQQueue queue = queueRegistry.getQueue(queueName);
             if (queue == null)
             {
-                // AMQP version change:  Be aware of possible changes to parameter order as versions change.
-                response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
-                    major, minor,	// AMQP version (major, minor)
-                    QUEUE_NOT_FOUND,	// replyCode
+
+                response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND,	// replyCode
                     new AMQShortString("Queue " + queueName + " not found"));	// replyText
             }
             else
             {
-                if (exchange.isBound(body.routingKey, queue))
+                if (exchange.isBound(body.getRoutingKey(), queue))
                 {
-                    // AMQP version change:  Be aware of possible changes to parameter order as versions change.
-                    response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
-                        major, minor,	// AMQP version (major, minor)
-                        OK,	// replyCode
+
+                    response = methodRegistry.createExchangeBoundOkBody(OK,	// replyCode
                         null);	// replyText
                 }
                 else
                 {
-                    // AMQP version change:  Be aware of possible changes to parameter order as versions change.
-                    response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
-                        major, minor,	// AMQP version (major, minor)
-                        SPECIFIC_QUEUE_NOT_BOUND_WITH_RK,	// replyCode
+
+                    response = methodRegistry.createExchangeBoundOkBody(SPECIFIC_QUEUE_NOT_BOUND_WITH_RK,	// replyCode
                         new AMQShortString("Queue " + queueName + " not bound with routing key " +
-                        body.routingKey + " to exchange " + exchangeName));	// replyText
+                        body.getRoutingKey() + " to exchange " + exchangeName));	// replyText
                 }
             }
         }
         else
         {
-            if (exchange.isBound(body.routingKey))
+            if (exchange.isBound(body.getRoutingKey()))
             {
-                // AMQP version change:  Be aware of possible changes to parameter order as versions change.
-                response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
-                    major, minor,	// AMQP version (major, minor)
-                    OK,	// replyCode
+
+                response = methodRegistry.createExchangeBoundOkBody(OK,	// replyCode
                     null);	// replyText
             }
             else
             {
-                // AMQP version change:  Be aware of possible changes to parameter order as versions change.
-                response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
-                    major, minor,	// AMQP version (major, minor)
-                    NO_QUEUE_BOUND_WITH_RK,	// replyCode
-                    new AMQShortString("No queue bound with routing key " + body.routingKey +
+
+                response = methodRegistry.createExchangeBoundOkBody(NO_QUEUE_BOUND_WITH_RK,	// replyCode
+                    new AMQShortString("No queue bound with routing key " + body.getRoutingKey() +
                     " to exchange " + exchangeName));	// replyText
             }
         }
-        session.writeFrame(response);
+        session.writeFrame(response.generateFrame(channelId));
     }
 }

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java Sat Nov 24 13:14:14 2007
@@ -24,9 +24,7 @@
 import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQUnknownExchangeType;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.ExchangeDeclareBody;
-import org.apache.qpid.framing.ExchangeDeclareOkBody;
+import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.exchange.Exchange;
@@ -54,61 +52,60 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ExchangeDeclareBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, ExchangeDeclareBody body, int channelId) throws AMQException
     {
         AMQProtocolSession session = stateManager.getProtocolSession();
         VirtualHost virtualHost = session.getVirtualHost();
         ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
         ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
         
-        final ExchangeDeclareBody body = evt.getMethod();
+
         if (_logger.isDebugEnabled())
         {
-            _logger.debug("Request to declare exchange of type " + body.type + " with name " + body.exchange);
+            _logger.debug("Request to declare exchange of type " + body.getType() + " with name " + body.getExchange());
         }
         synchronized(exchangeRegistry)
         {
-            Exchange exchange = exchangeRegistry.getExchange(body.exchange);
+            Exchange exchange = exchangeRegistry.getExchange(body.getExchange());
 
 
 
             if (exchange == null)
             {
-                if(body.passive && ((body.type == null) || body.type.length() ==0))
+                if(body.getPassive() && ((body.getType() == null) || body.getType().length() ==0))
                 {
-                    throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + body.exchange);                    
+                    throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + body.getExchange());
                 }
                 else
                 {
                     try
                     {
 
-                    exchange = exchangeFactory.createExchange(body.exchange == null ? null : body.exchange.intern(),
-                                                              body.type == null ? null : body.type.intern(), 
-                                                              body.durable,
-                                                              body.passive, body.ticket);
+                    exchange = exchangeFactory.createExchange(body.getExchange() == null ? null : body.getExchange().intern(),
+                                                              body.getType() == null ? null : body.getType().intern(),
+                                                              body.getDurable(),
+                                                              body.getPassive(), body.getTicket());
                     exchangeRegistry.registerExchange(exchange);
                     }
                     catch(AMQUnknownExchangeType e)
                     {
-                        throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + body.exchange,e);
+                        throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + body.getExchange(),e);
                     }
                 }
             }
-            else if (!exchange.getType().equals(body.type))
+            else if (!exchange.getType().equals(body.getType()))
             {
 
-                throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + body.exchange + " of type " + exchange.getType() + " to " + body.type +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor());    
+                throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + body.getExchange() + " of type " + exchange.getType() + " to " + body.getType() +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor());
             }
 
         }
-        if(!body.nowait)
+        if(!body.getNowait())
         {
-            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-            // Be aware of possible changes to parameter order as versions change.
-            AMQFrame response = ExchangeDeclareOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
-            session.writeFrame(response);
+            MethodRegistry methodRegistry = session.getMethodRegistry();
+            AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody();
+            session.writeFrame(responseBody.generateFrame(channelId));
+
         }
     }
 }

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java Sat Nov 24 13:14:14 2007
@@ -45,21 +45,20 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ExchangeDeleteBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, ExchangeDeleteBody body, int channelId) throws AMQException
     {
         AMQProtocolSession session = stateManager.getProtocolSession();
         VirtualHost virtualHost = session.getVirtualHost();
         ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
 
-        ExchangeDeleteBody body = evt.getMethod();
+
         try
         {
-            exchangeRegistry.unregisterExchange(body.exchange, body.ifUnused);
-            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-            // Be aware of possible changes to parameter order as versions change.
-            AMQFrame response = ExchangeDeleteOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
-            session.writeFrame(response);
+            exchangeRegistry.unregisterExchange(body.getExchange(), body.getIfUnused());
+
+            ExchangeDeleteOkBody responseBody = session.getMethodRegistry().createExchangeDeleteOkBody();
+                        
+            session.writeFrame(responseBody.generateFrame(channelId));
         }
         catch (ExchangeInUseException e)
         {

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java Sat Nov 24 13:14:14 2007
@@ -23,9 +23,7 @@
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQInvalidRoutingKeyException;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.QueueBindBody;
-import org.apache.qpid.framing.QueueBindOkBody;
+import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.AMQChannel;
@@ -53,22 +51,24 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueueBindBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, QueueBindBody body, int channelId) throws AMQException
     {
         AMQProtocolSession session = stateManager.getProtocolSession();
         VirtualHost virtualHost = session.getVirtualHost();
         ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
         QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
 
-        final QueueBindBody body = evt.getMethod();
+
         final AMQQueue queue;
-        if (body.queue == null)
+        final AMQShortString routingKey;
+
+        if (body.getQueue() == null)
         {
-            AMQChannel channel = session.getChannel(evt.getChannelId());
+            AMQChannel channel = session.getChannel(channelId);
 
             if (channel == null)
             {
-                throw body.getChannelNotFoundException(evt.getChannelId());
+                throw body.getChannelNotFoundException(channelId);
             }
 
             queue = channel.getDefaultQueue();
@@ -78,41 +78,42 @@
                 throw body.getChannelException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null");
             }
 
-            if (body.routingKey == null)
+            if (body.getRoutingKey() == null)
+            {
+                routingKey = queue.getName();
+            }
+            else
             {
-                body.routingKey = queue.getName();
+                routingKey = body.getRoutingKey().intern();
             }
         }
         else
         {
-            queue = queueRegistry.getQueue(body.queue);
+            queue = queueRegistry.getQueue(body.getQueue());
+            routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern();
         }
 
         if (queue == null)
         {
-            throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.queue + " does not exist.");
+            throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
         }
-        final Exchange exch = exchangeRegistry.getExchange(body.exchange);
+        final Exchange exch = exchangeRegistry.getExchange(body.getExchange());
         if (exch == null)
         {
-            throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.exchange + " does not exist.");
+            throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist.");
         }
 
-        if (body.routingKey != null)
-        {
-            body.routingKey = body.routingKey.intern();
-        }
 
         try
         {
-            if (!exch.isBound(body.routingKey, body.arguments, queue))
+            if (!exch.isBound(routingKey, body.getArguments(), queue))
             {
-                queue.bind(body.routingKey, body.arguments, exch);
+                queue.bind(routingKey, body.getArguments(), exch);
             }
         }
         catch (AMQInvalidRoutingKeyException rke)
         {
-            throw body.getChannelException(AMQConstant.INVALID_ROUTING_KEY, body.routingKey.toString());
+            throw body.getChannelException(AMQConstant.INVALID_ROUTING_KEY, routingKey.toString());
         }
         catch (AMQException e)
         {
@@ -121,15 +122,14 @@
 
         if (_log.isInfoEnabled())
         {
-            _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + body.routingKey);
+            _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey);
         }
-        if (!body.nowait)
+        if (!body.getNowait())
         {
-            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-            // Be aware of possible changes to parameter order as versions change.
-            final AMQFrame response = QueueBindOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0);
-            session.writeFrame(response);
+            MethodRegistry methodRegistry = session.getMethodRegistry();
+            AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody();
+            session.writeFrame(responseBody.generateFrame(channelId));
+
         }
     }
 }

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Sat Nov 24 13:14:14 2007
@@ -28,10 +28,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.configuration.Configured;
 import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.QueueDeclareOkBody;
+import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.configuration.Configurator;
@@ -70,7 +67,7 @@
         Configurator.configure(this);
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, QueueDeclareBody body, int channelId) throws AMQException
     {
         AMQProtocolSession session = stateManager.getProtocolSession();
         VirtualHost virtualHost = session.getVirtualHost();
@@ -78,12 +75,20 @@
         QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
         MessageStore store = virtualHost.getMessageStore();
 
-        QueueDeclareBody body = evt.getMethod();
+
+
+
+        final AMQShortString queueName;
 
         // if we aren't given a queue name, we create one which we return to the client
-        if (body.queue == null)
+
+        if (body.getQueue() == null)
         {
-            body.queue = createName();
+            queueName = createName();
+        }
+        else
+        {
+            queueName = body.getQueue().intern();
         }
 
         AMQQueue queue;
@@ -94,16 +99,12 @@
 
 
 
-            if (((queue = queueRegistry.getQueue(body.queue)) == null))
+            if (((queue = queueRegistry.getQueue(queueName)) == null))
             {
-                if(body.queue != null)
-                {
-                    body.queue = body.queue.intern();
-                }
 
-                if (body.passive)
+                if (body.getPassive())
                 {
-                    String msg = "Queue: " + body.queue + " not found on VirtualHost(" + virtualHost + ").";
+                    String msg = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ").";
                     throw body.getChannelException(AMQConstant.NOT_FOUND, msg);
                 }
                 else
@@ -118,42 +119,40 @@
                     {
                         Exchange defaultExchange = exchangeRegistry.getDefaultExchange();
 
-                        queue.bind(body.queue, null, defaultExchange);
-                        _log.info("Queue " + body.queue + " bound to default exchange(" + defaultExchange.getName() + ")");
+                        queue.bind(queueName, null, defaultExchange);
+                        _log.info("Queue " + queueName + " bound to default exchange(" + defaultExchange.getName() + ")");
                     }
                 }
             }
             else if (queue.getOwner() != null && !session.getContextKey().equals(queue.getOwner()))
             {
-                throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + body.queue + "'),"
+                throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + queueName + "'),"
                                                                            + " as exclusive queue with same name "
                                                                            + "declared on another client ID('"
                                                                            + queue.getOwner() + "')");
             }
 
-            AMQChannel channel = session.getChannel(evt.getChannelId());
+            AMQChannel channel = session.getChannel(channelId);
 
             if (channel == null)
             {
-                throw body.getChannelNotFoundException(evt.getChannelId());
+                throw body.getChannelNotFoundException(channelId);
             }
 
             //set this as the default queue on the channel:
             channel.setDefaultQueue(queue);
         }
 
-        if (!body.nowait)
+        if (!body.getNowait())
         {
-            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-            // Be aware of possible changes to parameter order as versions change.
-            AMQFrame response = QueueDeclareOkBody.createAMQFrame(evt.getChannelId(),
-                                                                  (byte) 8, (byte) 0,    // AMQP version (major, minor)
-                                                                  queue.getConsumerCount(), // consumerCount
-                                                                  queue.getMessageCount(), // messageCount
-                                                                  body.queue); // queue
-            _log.info("Queue " + body.queue + " declared successfully");
-            session.writeFrame(response);
+            MethodRegistry methodRegistry = session.getMethodRegistry();
+            QueueDeclareOkBody responseBody =
+                    methodRegistry.createQueueDeclareOkBody(queueName,
+                                                            queue.getMessageCount(),
+                                                            queue.getConsumerCount());
+            session.writeFrame(responseBody.generateFrame(channelId));
+
+            _log.info("Queue " + queueName + " declared successfully");
         }
     }
 
@@ -166,11 +165,11 @@
             throws AMQException
     {
         final QueueRegistry registry = virtualHost.getQueueRegistry();
-        AMQShortString owner = body.exclusive ? session.getContextKey() : null;
-        final AMQQueue queue = new AMQQueue(body.queue, body.durable, owner, body.autoDelete, virtualHost);
+        AMQShortString owner = body.getExclusive() ? session.getContextKey() : null;
+        final AMQQueue queue = new AMQQueue(body.getQueue(), body.getDurable(), owner, body.getAutoDelete(), virtualHost);
         final AMQShortString queueName = queue.getName();
 
-        if (body.exclusive && !body.durable)
+        if (body.getExclusive() && !body.getDurable())
         {
             final AMQProtocolSession.Task deleteQueueTask =
                     new AMQProtocolSession.Task()

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java Sat Nov 24 13:14:14 2007
@@ -23,6 +23,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.QueueDeleteBody;
 import org.apache.qpid.framing.QueueDeleteOkBody;
+import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -56,22 +57,21 @@
 
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, QueueDeleteBody body, int channelId) throws AMQException
     {
         AMQProtocolSession session = stateManager.getProtocolSession();
         VirtualHost virtualHost = session.getVirtualHost();
         QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
         MessageStore store = virtualHost.getMessageStore();
 
-        QueueDeleteBody body = evt.getMethod();
         AMQQueue queue;
-        if (body.queue == null)
+        if (body.getQueue() == null)
         {
-            AMQChannel channel = session.getChannel(evt.getChannelId());
+            AMQChannel channel = session.getChannel(channelId);
 
             if (channel == null)
             {
-                throw body.getChannelNotFoundException(evt.getChannelId());
+                throw body.getChannelNotFoundException(channelId);
             }
 
             //get the default queue on the channel:            
@@ -79,43 +79,40 @@
         }
         else
         {
-            queue = queueRegistry.getQueue(body.queue);
+            queue = queueRegistry.getQueue(body.getQueue());
         }
 
         if (queue == null)
         {
             if (_failIfNotFound)
             {
-                throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.queue + " does not exist.");
+                throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
             }
         }
         else
         {
-            if (body.ifEmpty && !queue.isEmpty())
+            if (body.getIfEmpty() && !queue.isEmpty())
             {
-                throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.queue + " is not empty.");
+                throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is not empty.");
             }
-            else if (body.ifUnused && !queue.isUnused())
+            else if (body.getIfUnused() && !queue.isUnused())
             {
                 // TODO - Error code
-                throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.queue + " is still used.");
+                throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is still used.");
 
             }
             else
             {
-                int purged = queue.delete(body.ifUnused, body.ifEmpty);
+                int purged = queue.delete(body.getIfUnused(), body.getIfEmpty());
 
                 if (queue.isDurable())
                 {
                     store.removeQueue(queue.getName());
                 }
-                
-                // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-                // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-                // Be aware of possible changes to parameter order as versions change.
-                session.writeFrame(QueueDeleteOkBody.createAMQFrame(evt.getChannelId(),
-                                                                    (byte) 8, (byte) 0,    // AMQP version (major, minor)
-                                                                    purged));    // messageCount
+
+                MethodRegistry methodRegistry = session.getMethodRegistry();
+                QueueDeleteOkBody responseBody = methodRegistry.createQueueDeleteOkBody(purged);
+                session.writeFrame(responseBody.generateFrame(channelId));
             }
         }
     }

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java Sat Nov 24 13:14:14 2007
@@ -24,6 +24,8 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.QueuePurgeBody;
 import org.apache.qpid.framing.QueuePurgeOkBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -55,22 +57,22 @@
         _failIfNotFound = failIfNotFound;
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueuePurgeBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, QueuePurgeBody body, int channelId) throws AMQException
     {
         AMQProtocolSession session = stateManager.getProtocolSession();
         VirtualHost virtualHost = session.getVirtualHost();
         QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
 
-        AMQChannel channel = session.getChannel(evt.getChannelId());
+        AMQChannel channel = session.getChannel(channelId);
+
 
-        QueuePurgeBody body = evt.getMethod();
         AMQQueue queue;
-        if(body.queue == null)
+        if(body.getQueue() == null)
         {
 
            if (channel == null)
            {
-               throw body.getChannelNotFoundException(evt.getChannelId());
+               throw body.getChannelNotFoundException(channelId);
            }
 
            //get the default queue on the channel:
@@ -86,14 +88,14 @@
         }
         else
         {
-            queue = queueRegistry.getQueue(body.queue);
+            queue = queueRegistry.getQueue(body.getQueue());
         }
 
         if(queue == null)
         {
             if(_failIfNotFound)
             {
-                throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.queue + " does not exist.");
+                throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
             }
         }
         else
@@ -101,14 +103,13 @@
                 long purged = queue.clearQueue(channel.getStoreContext());
 
 
-                if(!body.nowait)
+                if(!body.getNowait())
                 {
-                    // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-                    // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-                    // Be aware of possible changes to parameter order as versions change.
-                    session.writeFrame(QueuePurgeOkBody.createAMQFrame(evt.getChannelId(),
-                        (byte)8, (byte)0,	// AMQP version (major, minor)
-                        purged));	// messageCount
+
+                    MethodRegistry methodRegistry = session.getMethodRegistry();
+                    AMQMethodBody responseBody = methodRegistry.createQueuePurgeOkBody(purged);
+                    session.writeFrame(responseBody.generateFrame(channelId));
+                    
                 }
         }
     }