You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2008/02/08 11:10:11 UTC

svn commit: r619823 [6/19] - in /incubator/qpid/branches/thegreatmerge/qpid: ./ cpp/ dotnet/ dotnet/Qpid.Buffer.Tests/Properties/ dotnet/Qpid.Buffer/Properties/ dotnet/Qpid.Client.Tests/ dotnet/Qpid.Client.Tests/Channel/ dotnet/Qpid.Client.Tests/Common...

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java Fri Feb  8 02:09:37 2008
@@ -26,11 +26,7 @@
 import org.apache.commons.configuration.Configuration;
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.ConnectionSecureBody;
-import org.apache.qpid.framing.ConnectionStartOkBody;
-import org.apache.qpid.framing.ConnectionTuneBody;
-import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -52,7 +48,7 @@
 
     private static final int DEFAULT_FRAME_SIZE = 65536;
 
-    public static StateAwareMethodListener<ConnectionStartOkBody> getInstance()
+    public static ConnectionStartOkMethodHandler getInstance()
     {
         return _instance;
     }
@@ -61,51 +57,51 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionStartOkBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, ConnectionStartOkBody body, int channelId) throws AMQException
     {
         AMQProtocolSession session = stateManager.getProtocolSession();
-        final ConnectionStartOkBody body = evt.getMethod();
-        _logger.info("SASL Mechanism selected: " + body.mechanism);
-        _logger.info("Locale selected: " + body.locale);
+
+        _logger.info("SASL Mechanism selected: " + body.getMechanism());
+        _logger.info("Locale selected: " + body.getLocale());
 
         AuthenticationManager authMgr = ApplicationRegistry.getInstance().getAuthenticationManager();//session.getVirtualHost().getAuthenticationManager();
 
         SaslServer ss = null;
         try
         {
-            ss = authMgr.createSaslServer(String.valueOf(body.mechanism), session.getLocalFQDN());
+            ss = authMgr.createSaslServer(String.valueOf(body.getMechanism()), session.getLocalFQDN());
 
             if (ss == null)
             {
-                throw body.getConnectionException(AMQConstant.RESOURCE_ERROR, "Unable to create SASL Server:" + body.mechanism
+                throw body.getConnectionException(AMQConstant.RESOURCE_ERROR, "Unable to create SASL Server:" + body.getMechanism()
                 );
             }
 
             session.setSaslServer(ss);
 
-            AuthenticationResult authResult = authMgr.authenticate(ss, body.response);
+            AuthenticationResult authResult = authMgr.authenticate(ss, body.getResponse());
 
             //save clientProperties
             if (session.getClientProperties() == null)
             {
-                session.setClientProperties(body.clientProperties);
+                session.setClientProperties(body.getClientProperties());
             }
 
+            MethodRegistry methodRegistry = session.getMethodRegistry();
+
             switch (authResult.status)
             {
                 case ERROR:
                     _logger.info("Authentication failed");
                     stateManager.changeState(AMQState.CONNECTION_CLOSING);
-                    // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-                    // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-                    // Be aware of possible changes to parameter order as versions change.
-                    AMQFrame close = ConnectionCloseBody.createAMQFrame(0,
-                                                                        (byte) 8, (byte) 0,    // AMQP version (major, minor)
-                                                                        ConnectionCloseBody.getClazz((byte) 8, (byte) 0),        // classId
-                                                                        ConnectionCloseBody.getMethod((byte) 8, (byte) 0),    // methodId
-                                                                        AMQConstant.NOT_ALLOWED.getCode(),    // replyCode
-                                                                        AMQConstant.NOT_ALLOWED.getName());    // replyText
-                    session.writeFrame(close);
+
+                    ConnectionCloseBody closeBody =
+                            methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(),    // replyCode
+                                                                     AMQConstant.NOT_ALLOWED.getName(),
+                                                                     body.getClazz(),
+                                                                     body.getMethod());
+
+                    session.writeFrame(closeBody.generateFrame(0));
                     disposeSaslServer(session);
                     break;
 
@@ -114,25 +110,17 @@
                     session.setAuthorizedID(new UsernamePrincipal(ss.getAuthorizationID()));
 
                     stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
-                    // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-                    // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-                    // Be aware of possible changes to parameter order as versions change.
-                    AMQFrame tune = ConnectionTuneBody.createAMQFrame(0,
-                                                                      (byte) 8, (byte) 0,    // AMQP version (major, minor)
-                                                                      Integer.MAX_VALUE,    // channelMax
-                                                                      getConfiguredFrameSize(),    // frameMax
-                                                                      HeartbeatConfig.getInstance().getDelay());    // heartbeat
-                    session.writeFrame(tune);
+
+                    ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody(0xFFFF,
+                                                                                          getConfiguredFrameSize(),
+                                                                                          HeartbeatConfig.getInstance().getDelay());
+                    session.writeFrame(tuneBody.generateFrame(0));
                     break;
                 case CONTINUE:
                     stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
-                    // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-                    // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-                    // Be aware of possible changes to parameter order as versions change.
-                    AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0,
-                                                                             (byte) 8, (byte) 0,    // AMQP version (major, minor)
-                                                                             authResult.challenge);    // challenge
-                    session.writeFrame(challenge);
+
+                    ConnectionSecureBody secureBody = methodRegistry.createConnectionSecureBody(authResult.challenge);
+                    session.writeFrame(secureBody.generateFrame(0));
             }
         }
         catch (SaslException e)

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java Fri Feb  8 02:09:37 2008
@@ -40,15 +40,15 @@
         return _instance;
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionTuneOkBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, ConnectionTuneOkBody body, int channelId) throws AMQException
     {
         AMQProtocolSession session = stateManager.getProtocolSession();
-        ConnectionTuneOkBody body = evt.getMethod();
+
         if (_logger.isDebugEnabled())
         {
             _logger.debug(body);
         }
         stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
-        session.initHeartbeats(body.heartbeat);
+        session.initHeartbeats(body.getHeartbeat());
     }
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java Fri Feb  8 02:09:37 2008
@@ -21,10 +21,8 @@
 package org.apache.qpid.server.handler;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ExchangeBoundBody;
-import org.apache.qpid.framing.ExchangeBoundOkBody;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -36,6 +34,8 @@
 
 /**
  * @author Apache Software Foundation
+ *
+ *
  */
 public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBoundBody>
 {
@@ -64,35 +64,32 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ExchangeBoundBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, ExchangeBoundBody body, int channelId) throws AMQException
     {
         AMQProtocolSession session = stateManager.getProtocolSession();
+        
         VirtualHost virtualHost = session.getVirtualHost();
         QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+        MethodRegistry methodRegistry = session.getMethodRegistry();
 
-        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-        byte major = (byte)8;
-        byte minor = (byte)0;
         
-        ExchangeBoundBody body = evt.getMethod();
 
-        AMQShortString exchangeName = body.exchange;
-        AMQShortString queueName = body.queue;
-        AMQShortString routingKey = body.routingKey;
+
+        AMQShortString exchangeName = body.getExchange();
+        AMQShortString queueName = body.getQueue();
+        AMQShortString routingKey = body.getRoutingKey();
         if (exchangeName == null)
         {
             throw new AMQException(null, "Exchange exchange must not be null", null);
         }
         Exchange exchange = virtualHost.getExchangeRegistry().getExchange(exchangeName);
-        AMQFrame response;
+        ExchangeBoundOkBody response;
         if (exchange == null)
         {
-            // AMQP version change:  Be aware of possible changes to parameter order as versions change.
-            response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
-                major, minor,	// AMQP version (major, minor)
-                EXCHANGE_NOT_FOUND,	// replyCode
-                new AMQShortString("Exchange " + exchangeName + " not found"));	// replyText
+
+
+            response = methodRegistry.createExchangeBoundOkBody(EXCHANGE_NOT_FOUND,
+                                                                new AMQShortString("Exchange " + exchangeName + " not found"));
         }
         else if (routingKey == null)
         {
@@ -100,18 +97,12 @@
             {
                 if (exchange.hasBindings())
                 {
-                    // AMQP version change:  Be aware of possible changes to parameter order as versions change.
-                    response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
-                        major, minor,	// AMQP version (major, minor)
-                        OK,	// replyCode
-                        null);	// replyText
+                    response = methodRegistry.createExchangeBoundOkBody(OK, null);
                 }
                 else
                 {
-                    // AMQP version change:  Be aware of possible changes to parameter order as versions change.
-                    response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
-                        major, minor,	// AMQP version (major, minor)
-                        NO_BINDINGS,	// replyCode
+
+                    response = methodRegistry.createExchangeBoundOkBody(NO_BINDINGS,	// replyCode
                         null);	// replyText
                 }
             }
@@ -121,28 +112,22 @@
                 AMQQueue queue = queueRegistry.getQueue(queueName);
                 if (queue == null)
                 {
-                    // AMQP version change:  Be aware of possible changes to parameter order as versions change.
-                    response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
-                        major, minor,	// AMQP version (major, minor)
-                        QUEUE_NOT_FOUND,	// replyCode
+
+                    response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND,	// replyCode
                         new AMQShortString("Queue " + queueName + " not found"));	// replyText
                 }
                 else
                 {
                     if (exchange.isBound(queue))
                     {
-                        // AMQP version change:  Be aware of possible changes to parameter order as versions change.
-                        response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
-                            major, minor,	// AMQP version (major, minor)
-                            OK,	// replyCode
+
+                        response = methodRegistry.createExchangeBoundOkBody(OK,	// replyCode
                             null);	// replyText
                     }
                     else
                     {
-                        // AMQP version change:  Be aware of possible changes to parameter order as versions change.
-                        response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
-                            major, minor,	// AMQP version (major, minor)
-                            QUEUE_NOT_BOUND,	// replyCode
+
+                        response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_BOUND,	// replyCode
                             new AMQShortString("Queue " + queueName + " not bound to exchange " + exchangeName));	// replyText
                     }
                 }
@@ -153,53 +138,43 @@
             AMQQueue queue = queueRegistry.getQueue(queueName);
             if (queue == null)
             {
-                // AMQP version change:  Be aware of possible changes to parameter order as versions change.
-                response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
-                    major, minor,	// AMQP version (major, minor)
-                    QUEUE_NOT_FOUND,	// replyCode
+
+                response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND,	// replyCode
                     new AMQShortString("Queue " + queueName + " not found"));	// replyText
             }
             else
             {
-                if (exchange.isBound(body.routingKey, queue))
+                if (exchange.isBound(body.getRoutingKey(), queue))
                 {
-                    // AMQP version change:  Be aware of possible changes to parameter order as versions change.
-                    response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
-                        major, minor,	// AMQP version (major, minor)
-                        OK,	// replyCode
+
+                    response = methodRegistry.createExchangeBoundOkBody(OK,	// replyCode
                         null);	// replyText
                 }
                 else
                 {
-                    // AMQP version change:  Be aware of possible changes to parameter order as versions change.
-                    response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
-                        major, minor,	// AMQP version (major, minor)
-                        SPECIFIC_QUEUE_NOT_BOUND_WITH_RK,	// replyCode
+
+                    response = methodRegistry.createExchangeBoundOkBody(SPECIFIC_QUEUE_NOT_BOUND_WITH_RK,	// replyCode
                         new AMQShortString("Queue " + queueName + " not bound with routing key " +
-                        body.routingKey + " to exchange " + exchangeName));	// replyText
+                        body.getRoutingKey() + " to exchange " + exchangeName));	// replyText
                 }
             }
         }
         else
         {
-            if (exchange.isBound(body.routingKey))
+            if (exchange.isBound(body.getRoutingKey()))
             {
-                // AMQP version change:  Be aware of possible changes to parameter order as versions change.
-                response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
-                    major, minor,	// AMQP version (major, minor)
-                    OK,	// replyCode
+
+                response = methodRegistry.createExchangeBoundOkBody(OK,	// replyCode
                     null);	// replyText
             }
             else
             {
-                // AMQP version change:  Be aware of possible changes to parameter order as versions change.
-                response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
-                    major, minor,	// AMQP version (major, minor)
-                    NO_QUEUE_BOUND_WITH_RK,	// replyCode
-                    new AMQShortString("No queue bound with routing key " + body.routingKey +
+
+                response = methodRegistry.createExchangeBoundOkBody(NO_QUEUE_BOUND_WITH_RK,	// replyCode
+                    new AMQShortString("No queue bound with routing key " + body.getRoutingKey() +
                     " to exchange " + exchangeName));	// replyText
             }
         }
-        session.writeFrame(response);
+        session.writeFrame(response.generateFrame(channelId));
     }
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java Fri Feb  8 02:09:37 2008
@@ -24,9 +24,7 @@
 import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQUnknownExchangeType;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.ExchangeDeclareBody;
-import org.apache.qpid.framing.ExchangeDeclareOkBody;
+import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.exchange.Exchange;
@@ -54,61 +52,60 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ExchangeDeclareBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, ExchangeDeclareBody body, int channelId) throws AMQException
     {
         AMQProtocolSession session = stateManager.getProtocolSession();
         VirtualHost virtualHost = session.getVirtualHost();
         ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
         ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
         
-        final ExchangeDeclareBody body = evt.getMethod();
+
         if (_logger.isDebugEnabled())
         {
-            _logger.debug("Request to declare exchange of type " + body.type + " with name " + body.exchange);
+            _logger.debug("Request to declare exchange of type " + body.getType() + " with name " + body.getExchange());
         }
         synchronized(exchangeRegistry)
         {
-            Exchange exchange = exchangeRegistry.getExchange(body.exchange);
+            Exchange exchange = exchangeRegistry.getExchange(body.getExchange());
 
 
 
             if (exchange == null)
             {
-                if(body.passive && ((body.type == null) || body.type.length() ==0))
+                if(body.getPassive() && ((body.getType() == null) || body.getType().length() ==0))
                 {
-                    throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + body.exchange);                    
+                    throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + body.getExchange());
                 }
                 else
                 {
                     try
                     {
 
-                    exchange = exchangeFactory.createExchange(body.exchange == null ? null : body.exchange.intern(),
-                                                              body.type == null ? null : body.type.intern(), 
-                                                              body.durable,
-                                                              body.passive, body.ticket);
+                    exchange = exchangeFactory.createExchange(body.getExchange() == null ? null : body.getExchange().intern(),
+                                                              body.getType() == null ? null : body.getType().intern(),
+                                                              body.getDurable(),
+                                                              body.getPassive());
                     exchangeRegistry.registerExchange(exchange);
                     }
                     catch(AMQUnknownExchangeType e)
                     {
-                        throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + body.exchange,e);
+                        throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + body.getExchange(),e);
                     }
                 }
             }
-            else if (!exchange.getType().equals(body.type))
+            else if (!exchange.getType().equals(body.getType()))
             {
 
-                throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + body.exchange + " of type " + exchange.getType() + " to " + body.type +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor(), null);    
+                throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + body.getExchange() + " of type " + exchange.getType() + " to " + body.getType() +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor(), null);
             }
 
         }
-        if(!body.nowait)
+        if(!body.getNowait())
         {
-            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-            // Be aware of possible changes to parameter order as versions change.
-            AMQFrame response = ExchangeDeclareOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
-            session.writeFrame(response);
+            MethodRegistry methodRegistry = session.getMethodRegistry();
+            AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody();
+            session.writeFrame(responseBody.generateFrame(channelId));
+
         }
     }
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java Fri Feb  8 02:09:37 2008
@@ -45,21 +45,20 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ExchangeDeleteBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, ExchangeDeleteBody body, int channelId) throws AMQException
     {
         AMQProtocolSession session = stateManager.getProtocolSession();
         VirtualHost virtualHost = session.getVirtualHost();
         ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
 
-        ExchangeDeleteBody body = evt.getMethod();
+
         try
         {
-            exchangeRegistry.unregisterExchange(body.exchange, body.ifUnused);
-            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-            // Be aware of possible changes to parameter order as versions change.
-            AMQFrame response = ExchangeDeleteOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
-            session.writeFrame(response);
+            exchangeRegistry.unregisterExchange(body.getExchange(), body.getIfUnused());
+
+            ExchangeDeleteOkBody responseBody = session.getMethodRegistry().createExchangeDeleteOkBody();
+                        
+            session.writeFrame(responseBody.generateFrame(channelId));
         }
         catch (ExchangeInUseException e)
         {

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java Fri Feb  8 02:09:37 2008
@@ -23,9 +23,7 @@
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQInvalidRoutingKeyException;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.QueueBindBody;
-import org.apache.qpid.framing.QueueBindOkBody;
+import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.AMQChannel;
@@ -53,22 +51,24 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueueBindBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, QueueBindBody body, int channelId) throws AMQException
     {
         AMQProtocolSession session = stateManager.getProtocolSession();
         VirtualHost virtualHost = session.getVirtualHost();
         ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
         QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
 
-        final QueueBindBody body = evt.getMethod();
+
         final AMQQueue queue;
-        if (body.queue == null)
+        final AMQShortString routingKey;
+
+        if (body.getQueue() == null)
         {
-            AMQChannel channel = session.getChannel(evt.getChannelId());
+            AMQChannel channel = session.getChannel(channelId);
 
             if (channel == null)
             {
-                throw body.getChannelNotFoundException(evt.getChannelId());
+                throw body.getChannelNotFoundException(channelId);
             }
 
             queue = channel.getDefaultQueue();
@@ -78,41 +78,42 @@
                 throw body.getChannelException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null");
             }
 
-            if (body.routingKey == null)
+            if (body.getRoutingKey() == null)
+            {
+                routingKey = queue.getName();
+            }
+            else
             {
-                body.routingKey = queue.getName();
+                routingKey = body.getRoutingKey().intern();
             }
         }
         else
         {
-            queue = queueRegistry.getQueue(body.queue);
+            queue = queueRegistry.getQueue(body.getQueue());
+            routingKey = body.getRoutingKey() == null ? AMQShortString.EMPTY_STRING : body.getRoutingKey().intern();
         }
 
         if (queue == null)
         {
-            throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.queue + " does not exist.");
+            throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
         }
-        final Exchange exch = exchangeRegistry.getExchange(body.exchange);
+        final Exchange exch = exchangeRegistry.getExchange(body.getExchange());
         if (exch == null)
         {
-            throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.exchange + " does not exist.");
+            throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist.");
         }
 
-        if (body.routingKey != null)
-        {
-            body.routingKey = body.routingKey.intern();
-        }
 
         try
         {
-            if (!exch.isBound(body.routingKey, body.arguments, queue))
+            if (!exch.isBound(routingKey, body.getArguments(), queue))
             {
-                queue.bind(body.routingKey, body.arguments, exch);
+                queue.bind(routingKey, body.getArguments(), exch);
             }
         }
         catch (AMQInvalidRoutingKeyException rke)
         {
-            throw body.getChannelException(AMQConstant.INVALID_ROUTING_KEY, body.routingKey.toString());
+            throw body.getChannelException(AMQConstant.INVALID_ROUTING_KEY, routingKey.toString());
         }
         catch (AMQException e)
         {
@@ -121,15 +122,14 @@
 
         if (_log.isInfoEnabled())
         {
-            _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + body.routingKey);
+            _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey);
         }
-        if (!body.nowait)
+        if (!body.getNowait())
         {
-            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-            // Be aware of possible changes to parameter order as versions change.
-            final AMQFrame response = QueueBindOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0);
-            session.writeFrame(response);
+            MethodRegistry methodRegistry = session.getMethodRegistry();
+            AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody();
+            session.writeFrame(responseBody.generateFrame(channelId));
+
         }
     }
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Fri Feb  8 02:09:37 2008
@@ -27,10 +27,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.configuration.Configured;
 import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.QueueDeclareOkBody;
+import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.configuration.Configurator;
@@ -69,7 +66,7 @@
         Configurator.configure(this);
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, QueueDeclareBody body, int channelId) throws AMQException
     {
         AMQProtocolSession session = stateManager.getProtocolSession();
         VirtualHost virtualHost = session.getVirtualHost();
@@ -77,12 +74,20 @@
         QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
         MessageStore store = virtualHost.getMessageStore();
 
-        QueueDeclareBody body = evt.getMethod();
+
+
+
+        final AMQShortString queueName;
 
         // if we aren't given a queue name, we create one which we return to the client
-        if (body.queue == null)
+
+        if ((body.getQueue() == null) || (body.getQueue().length() == 0))
+        {
+            queueName = createName();
+        }
+        else
         {
-            body.queue = createName();
+            queueName = body.getQueue().intern();
         }
 
         AMQQueue queue;
@@ -91,21 +96,17 @@
         synchronized (queueRegistry)
         {
 
-            if (((queue = queueRegistry.getQueue(body.queue)) == null))
+            if (((queue = queueRegistry.getQueue(queueName)) == null))
             {
-                if(body.queue != null)
-                {
-                    body.queue = body.queue.intern();
-                }
 
-                if (body.passive)
+                if (body.getPassive())
                 {
-                    String msg = "Queue: " + body.queue + " not found on VirtualHost(" + virtualHost + ").";
+                    String msg = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ").";
                     throw body.getChannelException(AMQConstant.NOT_FOUND, msg);
                 }
                 else
                 {
-                    queue = createQueue(body, virtualHost, session);
+                    queue = createQueue(queueName,body, virtualHost, session);
                     if (queue.isDurable() && !queue.isAutoDelete())
                     {
                         //DTX MessageStore
@@ -122,42 +123,40 @@
                     {
                         Exchange defaultExchange = exchangeRegistry.getDefaultExchange();
 
-                        queue.bind(body.queue, null, defaultExchange);
-                        _log.info("Queue " + body.queue + " bound to default exchange(" + defaultExchange.getName() + ")");
+                        queue.bind(queueName, null, defaultExchange);
+                        _log.info("Queue " + queueName + " bound to default exchange(" + defaultExchange.getName() + ")");
                     }
                 }
             }
             else if (queue.getOwner() != null && !session.getContextKey().equals(queue.getOwner()))
             {
-                throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + body.queue + "'),"
+                throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + queueName + "'),"
                                                                            + " as exclusive queue with same name "
                                                                            + "declared on another client ID('"
                                                                            + queue.getOwner() + "')");
             }
 
-            AMQChannel channel = session.getChannel(evt.getChannelId());
+            AMQChannel channel = session.getChannel(channelId);
 
             if (channel == null)
             {
-                throw body.getChannelNotFoundException(evt.getChannelId());
+                throw body.getChannelNotFoundException(channelId);
             }
 
             //set this as the default queue on the channel:
             channel.setDefaultQueue(queue);
         }
 
-        if (!body.nowait)
+        if (!body.getNowait())
         {
-            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-            // Be aware of possible changes to parameter order as versions change.
-            AMQFrame response = QueueDeclareOkBody.createAMQFrame(evt.getChannelId(),
-                                                                  (byte) 8, (byte) 0,    // AMQP version (major, minor)
-                                                                  queue.getConsumerCount(), // consumerCount
-                                                                  queue.getMessageCount(), // messageCount
-                                                                  body.queue); // queue
-            _log.info("Queue " + body.queue + " declared successfully");
-            session.writeFrame(response);
+            MethodRegistry methodRegistry = session.getMethodRegistry();
+            QueueDeclareOkBody responseBody =
+                    methodRegistry.createQueueDeclareOkBody(queueName,
+                                                            queue.getMessageCount(),
+                                                            queue.getConsumerCount());
+            session.writeFrame(responseBody.generateFrame(channelId));
+
+            _log.info("Queue " + queueName + " declared successfully");
         }
     }
 
@@ -171,15 +170,18 @@
         return MessageFormat.format("{0,number,0000000000000}", value);
     }
 
-    protected AMQQueue createQueue(QueueDeclareBody body, VirtualHost virtualHost, final AMQProtocolSession session)
+    protected AMQQueue createQueue(final AMQShortString queueName,
+                                   QueueDeclareBody body,
+                                   VirtualHost virtualHost,
+                                   final AMQProtocolSession session)
             throws AMQException
     {
         final QueueRegistry registry = virtualHost.getQueueRegistry();
-        AMQShortString owner = body.exclusive ? session.getContextKey() : null;
-        final AMQQueue queue = new AMQQueue(body.queue, body.durable, owner, body.autoDelete, virtualHost);
-        final AMQShortString queueName = queue.getName();
+        AMQShortString owner = body.getExclusive() ? session.getContextKey() : null;
+        final AMQQueue queue = new AMQQueue(queueName, body.getDurable(), owner, body.getAutoDelete(), virtualHost);
+
 
-        if (body.exclusive && !body.durable)
+        if (body.getExclusive() && !body.getDurable())
         {
             final AMQProtocolSession.Task deleteQueueTask =
                     new AMQProtocolSession.Task()

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java Fri Feb  8 02:09:37 2008
@@ -23,6 +23,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.QueueDeleteBody;
 import org.apache.qpid.framing.QueueDeleteOkBody;
+import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -58,22 +59,21 @@
 
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, QueueDeleteBody body, int channelId) throws AMQException
     {
         AMQProtocolSession session = stateManager.getProtocolSession();
         VirtualHost virtualHost = session.getVirtualHost();
         QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
         MessageStore store = virtualHost.getMessageStore();
 
-        QueueDeleteBody body = evt.getMethod();
         AMQQueue queue;
-        if (body.queue == null)
+        if (body.getQueue() == null)
         {
-            AMQChannel channel = session.getChannel(evt.getChannelId());
+            AMQChannel channel = session.getChannel(channelId);
 
             if (channel == null)
             {
-                throw body.getChannelNotFoundException(evt.getChannelId());
+                throw body.getChannelNotFoundException(channelId);
             }
 
             //get the default queue on the channel:            
@@ -81,31 +81,31 @@
         }
         else
         {
-            queue = queueRegistry.getQueue(body.queue);
+            queue = queueRegistry.getQueue(body.getQueue());
         }
 
         if (queue == null)
         {
             if (_failIfNotFound)
             {
-                throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.queue + " does not exist.");
+                throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
             }
         }
         else
         {
-            if (body.ifEmpty && !queue.isEmpty())
+            if (body.getIfEmpty() && !queue.isEmpty())
             {
-                throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.queue + " is not empty.");
+                throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is not empty.");
             }
-            else if (body.ifUnused && !queue.isUnused())
+            else if (body.getIfUnused() && !queue.isUnused())
             {
                 // TODO - Error code
-                throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.queue + " is still used.");
+                throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is still used.");
 
             }
             else
             {
-                int purged = queue.delete(body.ifUnused, body.ifEmpty);
+                int purged = queue.delete(body.getIfUnused(), body.getIfEmpty());
 
                 if (queue.isDurable())
                 {
@@ -119,13 +119,10 @@
                       throw new AMQException(null, "problem when destroying queue " + queue, e);
                     }
                 }
-                
-                // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-                // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-                // Be aware of possible changes to parameter order as versions change.
-                session.writeFrame(QueueDeleteOkBody.createAMQFrame(evt.getChannelId(),
-                                                                    (byte) 8, (byte) 0,    // AMQP version (major, minor)
-                                                                    purged));    // messageCount
+
+                MethodRegistry methodRegistry = session.getMethodRegistry();
+                QueueDeleteOkBody responseBody = methodRegistry.createQueueDeleteOkBody(purged);
+                session.writeFrame(responseBody.generateFrame(channelId));
             }
         }
     }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java Fri Feb  8 02:09:37 2008
@@ -24,6 +24,8 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.QueuePurgeBody;
 import org.apache.qpid.framing.QueuePurgeOkBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -55,22 +57,22 @@
         _failIfNotFound = failIfNotFound;
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueuePurgeBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, QueuePurgeBody body, int channelId) throws AMQException
     {
         AMQProtocolSession session = stateManager.getProtocolSession();
         VirtualHost virtualHost = session.getVirtualHost();
         QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
 
-        AMQChannel channel = session.getChannel(evt.getChannelId());
+        AMQChannel channel = session.getChannel(channelId);
+
 
-        QueuePurgeBody body = evt.getMethod();
         AMQQueue queue;
-        if(body.queue == null)
+        if(body.getQueue() == null)
         {
 
            if (channel == null)
            {
-               throw body.getChannelNotFoundException(evt.getChannelId());
+               throw body.getChannelNotFoundException(channelId);
            }
 
            //get the default queue on the channel:
@@ -86,14 +88,14 @@
         }
         else
         {
-            queue = queueRegistry.getQueue(body.queue);
+            queue = queueRegistry.getQueue(body.getQueue());
         }
 
         if(queue == null)
         {
             if(_failIfNotFound)
             {
-                throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.queue + " does not exist.");
+                throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
             }
         }
         else
@@ -101,14 +103,13 @@
                 long purged = queue.clearQueue(channel.getStoreContext());
 
 
-                if(!body.nowait)
+                if(!body.getNowait())
                 {
-                    // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-                    // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-                    // Be aware of possible changes to parameter order as versions change.
-                    session.writeFrame(QueuePurgeOkBody.createAMQFrame(evt.getChannelId(),
-                        (byte)8, (byte)0,	// AMQP version (major, minor)
-                        purged));	// messageCount
+
+                    MethodRegistry methodRegistry = session.getMethodRegistry();
+                    AMQMethodBody responseBody = methodRegistry.createQueuePurgeOkBody(purged);
+                    session.writeFrame(responseBody.generateFrame(channelId));
+                    
                 }
         }
     }

Added: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java?rev=619823&view=auto
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java (added)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java Fri Feb  8 02:09:37 2008
@@ -0,0 +1,110 @@
+package org.apache.qpid.server.handler;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidRoutingKeyException;
+import org.apache.qpid.protocol.AMQConstant;
+
+public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindBody>
+{
+    private static final Logger _log = Logger.getLogger(QueueUnbindHandler.class);
+
+    private static final QueueUnbindHandler _instance = new QueueUnbindHandler();
+
+    public static QueueUnbindHandler getInstance()
+    {
+        return _instance;
+    }
+
+    private QueueUnbindHandler()
+    {
+    }
+
+    public void methodReceived(AMQStateManager stateManager, QueueUnbindBody body, int channelId) throws AMQException
+    {
+        AMQProtocolSession session = stateManager.getProtocolSession();
+        VirtualHost virtualHost = session.getVirtualHost();
+        ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+        QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
+
+        final AMQQueue queue;
+        final AMQShortString routingKey;
+
+        if (body.getQueue() == null)
+        {
+            AMQChannel channel = session.getChannel(channelId);
+
+            if (channel == null)
+            {
+                throw body.getChannelNotFoundException(channelId);
+            }
+
+            queue = channel.getDefaultQueue();
+
+            if (queue == null)
+            {
+                throw body.getConnectionException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null");
+            }
+
+            routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern();
+
+        }
+        else
+        {
+            queue = queueRegistry.getQueue(body.getQueue());
+            routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern();
+        }
+
+        if (queue == null)
+        {
+            throw body.getConnectionException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
+        }
+        final Exchange exch = exchangeRegistry.getExchange(body.getExchange());
+        if (exch == null)
+        {
+            throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist.");
+        }
+
+
+        try
+        {
+            queue.unBind(routingKey, body.getArguments(), exch);
+        }
+        catch (AMQInvalidRoutingKeyException rke)
+        {
+            throw body.getChannelException(AMQConstant.INVALID_ROUTING_KEY, routingKey.toString());
+        }
+        catch (AMQException e)
+        {
+            if(e.getErrorCode() == AMQConstant.NOT_FOUND)
+            {
+                throw body.getConnectionException(AMQConstant.NOT_FOUND,e.getMessage(),e);
+            }
+            throw body.getChannelException(AMQConstant.CHANNEL_ERROR, e.toString());
+        }
+
+        if (_log.isInfoEnabled())
+        {
+            _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey);
+        }
+
+        MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9) session.getMethodRegistry();
+        AMQMethodBody responseBody = methodRegistry.createQueueUnbindOkBody();
+        session.writeFrame(responseBody.generateFrame(channelId));
+
+
+    }
+}

Added: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl.java?rev=619823&view=auto
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl.java (added)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl.java Fri Feb  8 02:09:37 2008
@@ -0,0 +1,566 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import java.util.Map;
+import java.util.HashMap;
+
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.AMQException;
+
+public class ServerMethodDispatcherImpl implements MethodDispatcher
+{
+    private final AMQStateManager _stateManager;
+
+    private static interface DispatcherFactory
+        {
+            public MethodDispatcher createMethodDispatcher(AMQStateManager stateManager);
+        }
+
+        private static final Map<ProtocolVersion, DispatcherFactory> _dispatcherFactories =
+                new HashMap<ProtocolVersion, DispatcherFactory>();
+
+
+    static
+        {
+            _dispatcherFactories.put(ProtocolVersion.v8_0,
+                                     new DispatcherFactory()
+                                     {
+                                         public MethodDispatcher createMethodDispatcher(AMQStateManager stateManager)
+                                         {
+                                             return new ServerMethodDispatcherImpl_8_0(stateManager);
+                                         }
+                                     });
+
+            _dispatcherFactories.put(ProtocolVersion.v0_9,
+                                     new DispatcherFactory()
+                                     {
+                                         public MethodDispatcher createMethodDispatcher(AMQStateManager stateManager)
+                                         {
+                                             return new ServerMethodDispatcherImpl_0_9(stateManager);
+                                         }
+                                     });
+
+        }
+
+
+    private static final AccessRequestHandler _accessRequestHandler = AccessRequestHandler.getInstance();
+    private static final ChannelCloseHandler _channelCloseHandler = ChannelCloseHandler.getInstance();
+    private static final ChannelOpenHandler _channelOpenHandler = ChannelOpenHandler.getInstance();
+    private static final ChannelCloseOkHandler _channelCloseOkHandler = ChannelCloseOkHandler.getInstance();
+    private static final ConnectionCloseMethodHandler _connectionCloseMethodHandler = ConnectionCloseMethodHandler.getInstance();
+    private static final ConnectionCloseOkMethodHandler _connectionCloseOkMethodHandler = ConnectionCloseOkMethodHandler.getInstance();
+    private static final ConnectionOpenMethodHandler _connectionOpenMethodHandler = ConnectionOpenMethodHandler.getInstance();
+    private static final ConnectionTuneOkMethodHandler _connectionTuneOkMethodHandler = ConnectionTuneOkMethodHandler.getInstance();
+    private static final ConnectionSecureOkMethodHandler _connectionSecureOkMethodHandler = ConnectionSecureOkMethodHandler.getInstance();
+    private static final ConnectionStartOkMethodHandler _connectionStartOkMethodHandler = ConnectionStartOkMethodHandler.getInstance();
+    private static final ExchangeDeclareHandler _exchangeDeclareHandler = ExchangeDeclareHandler.getInstance();
+    private static final ExchangeDeleteHandler _exchangeDeleteHandler = ExchangeDeleteHandler.getInstance();
+    private static final ExchangeBoundHandler _exchangeBoundHandler = ExchangeBoundHandler.getInstance();
+    private static final BasicAckMethodHandler _basicAckMethodHandler = BasicAckMethodHandler.getInstance();
+    private static final BasicRecoverMethodHandler _basicRecoverMethodHandler = BasicRecoverMethodHandler.getInstance();
+    private static final BasicConsumeMethodHandler _basicConsumeMethodHandler = BasicConsumeMethodHandler.getInstance();
+    private static final BasicGetMethodHandler _basicGetMethodHandler = BasicGetMethodHandler.getInstance();
+    private static final BasicCancelMethodHandler _basicCancelMethodHandler = BasicCancelMethodHandler.getInstance();
+    private static final BasicPublishMethodHandler _basicPublishMethodHandler = BasicPublishMethodHandler.getInstance();
+    private static final BasicQosHandler _basicQosHandler = BasicQosHandler.getInstance();
+    private static final QueueBindHandler _queueBindHandler = QueueBindHandler.getInstance();
+    private static final QueueDeclareHandler _queueDeclareHandler = QueueDeclareHandler.getInstance();
+    private static final QueueDeleteHandler _queueDeleteHandler = QueueDeleteHandler.getInstance();
+    private static final QueuePurgeHandler _queuePurgeHandler = QueuePurgeHandler.getInstance();
+    private static final ChannelFlowHandler _channelFlowHandler = ChannelFlowHandler.getInstance();
+    private static final TxSelectHandler _txSelectHandler = TxSelectHandler.getInstance();
+    private static final TxCommitHandler _txCommitHandler = TxCommitHandler.getInstance();
+    private static final TxRollbackHandler _txRollbackHandler = TxRollbackHandler.getInstance();
+    private static final BasicRejectMethodHandler _basicRejectMethodHandler = BasicRejectMethodHandler.getInstance();
+
+
+
+    public static MethodDispatcher createMethodDispatcher(AMQStateManager stateManager, ProtocolVersion protocolVersion)
+    {
+        return _dispatcherFactories.get(protocolVersion).createMethodDispatcher(stateManager);
+    }
+
+
+    public ServerMethodDispatcherImpl(AMQStateManager stateManager)
+    {
+        _stateManager = stateManager;
+    }
+
+
+    protected AMQStateManager getStateManager()
+    {
+        return _stateManager;
+    }
+    
+
+
+    public boolean dispatchAccessRequest(AccessRequestBody body, int channelId) throws AMQException
+    {
+        _accessRequestHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchBasicAck(BasicAckBody body, int channelId) throws AMQException
+    {
+        _basicAckMethodHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchBasicCancel(BasicCancelBody body, int channelId) throws AMQException
+    {
+        _basicCancelMethodHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchBasicConsume(BasicConsumeBody body, int channelId) throws AMQException
+    {
+        _basicConsumeMethodHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchBasicGet(BasicGetBody body, int channelId) throws AMQException
+    {
+        _basicGetMethodHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchBasicPublish(BasicPublishBody body, int channelId) throws AMQException
+    {
+        _basicPublishMethodHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchBasicQos(BasicQosBody body, int channelId) throws AMQException
+    {
+        _basicQosHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchBasicRecover(BasicRecoverBody body, int channelId) throws AMQException
+    {
+        _basicRecoverMethodHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchBasicReject(BasicRejectBody body, int channelId) throws AMQException
+    {
+        _basicRejectMethodHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchChannelOpen(ChannelOpenBody body, int channelId) throws AMQException
+    {
+        _channelOpenHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+
+    public boolean dispatchAccessRequestOk(AccessRequestOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchBasicCancelOk(BasicCancelOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchBasicConsumeOk(BasicConsumeOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchBasicDeliver(BasicDeliverBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchBasicGetEmpty(BasicGetEmptyBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchBasicGetOk(BasicGetOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchBasicQosOk(BasicQosOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchBasicReturn(BasicReturnBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchChannelClose(ChannelCloseBody body, int channelId) throws AMQException
+    {
+        _channelCloseHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+
+    public boolean dispatchChannelCloseOk(ChannelCloseOkBody body, int channelId) throws AMQException
+    {
+        _channelCloseOkHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+
+    public boolean dispatchChannelFlow(ChannelFlowBody body, int channelId) throws AMQException
+    {
+        _channelFlowHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchChannelFlowOk(ChannelFlowOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchChannelOpenOk(ChannelOpenOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+
+    public boolean dispatchConnectionOpen(ConnectionOpenBody body, int channelId) throws AMQException
+    {
+        _connectionOpenMethodHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+
+    public boolean dispatchConnectionClose(ConnectionCloseBody body, int channelId) throws AMQException
+    {
+        _connectionCloseMethodHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+
+    public boolean dispatchConnectionCloseOk(ConnectionCloseOkBody body, int channelId) throws AMQException
+    {
+        _connectionCloseOkMethodHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchConnectionOpenOk(ConnectionOpenOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchConnectionRedirect(ConnectionRedirectBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchConnectionSecure(ConnectionSecureBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchConnectionStart(ConnectionStartBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchConnectionTune(ConnectionTuneBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchDtxSelectOk(DtxSelectOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchDtxStartOk(DtxStartOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchExchangeBoundOk(ExchangeBoundOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchExchangeDeclareOk(ExchangeDeclareOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchExchangeDeleteOk(ExchangeDeleteOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchFileCancelOk(FileCancelOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchFileConsumeOk(FileConsumeOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchFileDeliver(FileDeliverBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchFileOpen(FileOpenBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchFileOpenOk(FileOpenOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchFileQosOk(FileQosOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchFileReturn(FileReturnBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchFileStage(FileStageBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchQueueBindOk(QueueBindOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchQueueDeclareOk(QueueDeclareOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchQueueDeleteOk(QueueDeleteOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchQueuePurgeOk(QueuePurgeOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchStreamCancelOk(StreamCancelOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchStreamConsumeOk(StreamConsumeOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchStreamDeliver(StreamDeliverBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchStreamQosOk(StreamQosOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchStreamReturn(StreamReturnBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchTxCommitOk(TxCommitOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchTxRollbackOk(TxRollbackOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchTxSelectOk(TxSelectOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+
+    public boolean dispatchConnectionSecureOk(ConnectionSecureOkBody body, int channelId) throws AMQException
+    {
+        _connectionSecureOkMethodHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchConnectionStartOk(ConnectionStartOkBody body, int channelId) throws AMQException
+    {
+        _connectionStartOkMethodHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchConnectionTuneOk(ConnectionTuneOkBody body, int channelId) throws AMQException
+    {
+        _connectionTuneOkMethodHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchDtxSelect(DtxSelectBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchDtxStart(DtxStartBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchExchangeBound(ExchangeBoundBody body, int channelId) throws AMQException
+    {
+        _exchangeBoundHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchExchangeDeclare(ExchangeDeclareBody body, int channelId) throws AMQException
+    {
+        _exchangeDeclareHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchExchangeDelete(ExchangeDeleteBody body, int channelId) throws AMQException
+    {
+        _exchangeDeleteHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchFileAck(FileAckBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchFileCancel(FileCancelBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchFileConsume(FileConsumeBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchFilePublish(FilePublishBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchFileQos(FileQosBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchFileReject(FileRejectBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchQueueBind(QueueBindBody body, int channelId) throws AMQException
+    {
+        _queueBindHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchQueueDeclare(QueueDeclareBody body, int channelId) throws AMQException
+    {
+        _queueDeclareHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchQueueDelete(QueueDeleteBody body, int channelId) throws AMQException
+    {
+        _queueDeleteHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchQueuePurge(QueuePurgeBody body, int channelId) throws AMQException
+    {
+        _queuePurgeHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchStreamCancel(StreamCancelBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchStreamConsume(StreamConsumeBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchStreamPublish(StreamPublishBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchStreamQos(StreamQosBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchTunnelRequest(TunnelRequestBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchTxCommit(TxCommitBody body, int channelId) throws AMQException
+    {
+        _txCommitHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchTxRollback(TxRollbackBody body, int channelId) throws AMQException
+    {
+        _txRollbackHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchTxSelect(TxSelectBody body, int channelId) throws AMQException
+    {
+        _txSelectHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+
+
+
+}

Added: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_9.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_9.java?rev=619823&view=auto
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_9.java (added)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_9.java Fri Feb  8 02:09:37 2008
@@ -0,0 +1,164 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+
+import org.apache.qpid.framing.amqp_0_9.MethodDispatcher_0_9;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.AMQException;
+
+
+
+public class ServerMethodDispatcherImpl_0_9
+        extends ServerMethodDispatcherImpl
+        implements MethodDispatcher_0_9
+
+{
+
+    private static final BasicRecoverSyncMethodHandler _basicRecoverSyncMethodHandler =
+            BasicRecoverSyncMethodHandler.getInstance();
+    private static final QueueUnbindHandler _queueUnbindHandler =
+            QueueUnbindHandler.getInstance();
+
+
+    public ServerMethodDispatcherImpl_0_9(AMQStateManager stateManager)
+    {
+        super(stateManager);
+    }
+
+    public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException
+    {
+        _basicRecoverSyncMethodHandler.methodReceived(getStateManager(), body, channelId);
+        return true;
+    }
+
+    public boolean dispatchBasicRecoverSyncOk(BasicRecoverSyncOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchChannelOk(ChannelOkBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchChannelPing(ChannelPingBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchChannelPong(ChannelPongBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchChannelResume(ChannelResumeBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchMessageAppend(MessageAppendBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchMessageCancel(MessageCancelBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchMessageCheckpoint(MessageCheckpointBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchMessageClose(MessageCloseBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchMessageConsume(MessageConsumeBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchMessageEmpty(MessageEmptyBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchMessageGet(MessageGetBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchMessageOffset(MessageOffsetBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchMessageOk(MessageOkBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchMessageOpen(MessageOpenBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchMessageQos(MessageQosBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchMessageRecover(MessageRecoverBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchMessageReject(MessageRejectBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchMessageResume(MessageResumeBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchMessageTransfer(MessageTransferBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException
+    {
+        _queueUnbindHandler.methodReceived(getStateManager(),body,channelId);
+        return true;
+    }
+}

Added: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_8_0.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_8_0.java?rev=619823&view=auto
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_8_0.java (added)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_8_0.java Fri Feb  8 02:09:37 2008
@@ -0,0 +1,86 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.framing.amqp_8_0.MethodDispatcher_8_0;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.AMQException;
+
+public class ServerMethodDispatcherImpl_8_0
+        extends ServerMethodDispatcherImpl
+        implements MethodDispatcher_8_0
+{
+    public ServerMethodDispatcherImpl_8_0(AMQStateManager stateManager)
+    {
+        super(stateManager);
+    }
+
+    public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchChannelAlert(ChannelAlertBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchTestContent(TestContentBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchTestContentOk(TestContentOkBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchTestInteger(TestIntegerBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchTestIntegerOk(TestIntegerOkBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchTestString(TestStringBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchTestStringOk(TestStringOkBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchTestTable(TestTableBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchTestTableOk(TestTableOkBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+}

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java Fri Feb  8 02:09:37 2008
@@ -24,6 +24,8 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.TxCommitBody;
 import org.apache.qpid.framing.TxCommitOkBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -45,7 +47,7 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<TxCommitBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, TxCommitBody body, int channelId) throws AMQException
     {
         AMQProtocolSession session = stateManager.getProtocolSession();
 
@@ -53,25 +55,26 @@
         {
             if (_log.isDebugEnabled())
             {
-                _log.debug("Commit received on channel " + evt.getChannelId());
+                _log.debug("Commit received on channel " + channelId);
             }
-            AMQChannel channel = session.getChannel(evt.getChannelId());
+            AMQChannel channel = session.getChannel(channelId);
 
             if (channel == null)
             {
-                throw evt.getMethod().getChannelNotFoundException(evt.getChannelId());
+                throw body.getChannelNotFoundException(channelId);
             }
 
             channel.commit();
-            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-            // Be aware of possible changes to parameter order as versions change.
-            session.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0));
+
+            MethodRegistry methodRegistry = session.getMethodRegistry();
+            AMQMethodBody responseBody = methodRegistry.createTxCommitOkBody();
+            session.writeFrame(responseBody.generateFrame(channelId));
+            
             channel.processReturns(session);
         }
         catch (AMQException e)
         {
-            throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage());
+            throw body.getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage());
         }
     }
 }