You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/03/22 14:15:14 UTC

svn commit: r521253 [7/10] - in /incubator/qpid/branches/java.multi_version: ./ gentools/src/org/apache/qpid/gentools/ gentools/templ.cpp/ gentools/templ.cpp/class/ gentools/templ.cpp/field/ gentools/templ.cpp/method/ gentools/templ.cpp/model/ gentools...

Modified: incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java Thu Mar 22 06:14:42 2007
@@ -22,6 +22,7 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ExchangeBoundBody;
 import org.apache.qpid.framing.ExchangeBoundOkBody;
+import org.apache.qpid.framing.amqp_8_0.ExchangeBoundOkBodyImpl;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -74,9 +75,9 @@
         
         ExchangeBoundBody body = evt.getMethod();
 
-        AMQShortString exchangeName = body.exchange;
-        AMQShortString queueName = body.queue;
-        AMQShortString routingKey = body.routingKey;
+        AMQShortString exchangeName = body.getExchange();
+        AMQShortString queueName = body.getQueue();
+        AMQShortString routingKey = body.getRoutingKey();
         if (exchangeName == null)
         {
             throw new AMQException("Exchange exchange must not be null");
@@ -86,8 +87,8 @@
         if (exchange == null)
         {
             // AMQP version change:  Be aware of possible changes to parameter order as versions change.
-            response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
-                major, minor,	// AMQP version (major, minor)
+            response = createExchangeBoundResponseFrame(evt.getChannelId(),
+                                                        // AMQP version (major, minor)
                 EXCHANGE_NOT_FOUND,	// replyCode
                 new AMQShortString("Exchange " + exchangeName + " not found"));	// replyText
         }
@@ -98,16 +99,16 @@
                 if (exchange.hasBindings())
                 {
                     // AMQP version change:  Be aware of possible changes to parameter order as versions change.
-                    response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
-                        major, minor,	// AMQP version (major, minor)
+                    response = createExchangeBoundResponseFrame(evt.getChannelId(),
+                                                                // AMQP version (major, minor)
                         OK,	// replyCode
                         null);	// replyText
                 }
                 else
                 {
                     // AMQP version change:  Be aware of possible changes to parameter order as versions change.
-                    response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
-                        major, minor,	// AMQP version (major, minor)
+                    response = createExchangeBoundResponseFrame(evt.getChannelId(),
+                                                                // AMQP version (major, minor)
                         NO_BINDINGS,	// replyCode
                         null);	// replyText
                 }
@@ -119,8 +120,8 @@
                 if (queue == null)
                 {
                     // AMQP version change:  Be aware of possible changes to parameter order as versions change.
-                    response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
-                        major, minor,	// AMQP version (major, minor)
+                    response = createExchangeBoundResponseFrame(evt.getChannelId(),
+                                                                // AMQP version (major, minor)
                         QUEUE_NOT_FOUND,	// replyCode
                         new AMQShortString("Queue " + queueName + " not found"));	// replyText
                 }
@@ -129,16 +130,16 @@
                     if (exchange.isBound(queue))
                     {
                         // AMQP version change:  Be aware of possible changes to parameter order as versions change.
-                        response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
-                            major, minor,	// AMQP version (major, minor)
+                        response = createExchangeBoundResponseFrame(evt.getChannelId(),
+                                                                    // AMQP version (major, minor)
                             OK,	// replyCode
                             null);	// replyText
                     }
                     else
                     {
                         // AMQP version change:  Be aware of possible changes to parameter order as versions change.
-                        response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
-                            major, minor,	// AMQP version (major, minor)
+                        response = createExchangeBoundResponseFrame(evt.getChannelId(),
+                                                                    // AMQP version (major, minor)
                             QUEUE_NOT_BOUND,	// replyCode
                             new AMQShortString("Queue " + queueName + " not bound to exchange " + exchangeName));	// replyText
                     }
@@ -151,52 +152,58 @@
             if (queue == null)
             {
                 // AMQP version change:  Be aware of possible changes to parameter order as versions change.
-                response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
-                    major, minor,	// AMQP version (major, minor)
+                response = createExchangeBoundResponseFrame(evt.getChannelId(),
+                                                            // AMQP version (major, minor)
                     QUEUE_NOT_FOUND,	// replyCode
                     new AMQShortString("Queue " + queueName + " not found"));	// replyText
             }
             else
             {
-                if (exchange.isBound(body.routingKey, queue))
+                if (exchange.isBound(body.getRoutingKey(), queue))
                 {
                     // AMQP version change:  Be aware of possible changes to parameter order as versions change.
-                    response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
-                        major, minor,	// AMQP version (major, minor)
+                    response = createExchangeBoundResponseFrame(evt.getChannelId(),
+                                                                // AMQP version (major, minor)
                         OK,	// replyCode
                         null);	// replyText
                 }
                 else
                 {
                     // AMQP version change:  Be aware of possible changes to parameter order as versions change.
-                    response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
-                        major, minor,	// AMQP version (major, minor)
+                    response = createExchangeBoundResponseFrame(evt.getChannelId(),
+                                                                // AMQP version (major, minor)
                         SPECIFIC_QUEUE_NOT_BOUND_WITH_RK,	// replyCode
                         new AMQShortString("Queue " + queueName + " not bound with routing key " +
-                        body.routingKey + " to exchange " + exchangeName));	// replyText
+                        body.getRoutingKey() + " to exchange " + exchangeName));	// replyText
                 }
             }
         }
         else
         {
-            if (exchange.isBound(body.routingKey))
+            if (exchange.isBound(body.getRoutingKey()))
             {
                 // AMQP version change:  Be aware of possible changes to parameter order as versions change.
-                response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
-                    major, minor,	// AMQP version (major, minor)
+                response = createExchangeBoundResponseFrame(evt.getChannelId(),
+                                                            // AMQP version (major, minor)
                     OK,	// replyCode
                     null);	// replyText
             }
             else
             {
                 // AMQP version change:  Be aware of possible changes to parameter order as versions change.
-                response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
-                    major, minor,	// AMQP version (major, minor)
+                response = createExchangeBoundResponseFrame(evt.getChannelId(),
+                                                            // AMQP version (major, minor)
                     NO_QUEUE_BOUND_WITH_RK,	// replyCode
-                    new AMQShortString("No queue bound with routing key " + body.routingKey +
+                    new AMQShortString("No queue bound with routing key " + body.getRoutingKey() +
                     " to exchange " + exchangeName));	// replyText
             }
         }
         session.writeFrame(response);
+    }
+
+    private AMQFrame createExchangeBoundResponseFrame(int channelId, int replyCode, AMQShortString replyText)
+    {
+        ExchangeBoundOkBody okBody = new ExchangeBoundOkBodyImpl(replyCode,replyText);
+        return new AMQFrame(channelId,okBody);
     }
 }

Modified: incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java Thu Mar 22 06:14:42 2007
@@ -64,43 +64,43 @@
         final ExchangeDeclareBody body = evt.getMethod();
         if (_logger.isDebugEnabled())
         {
-            _logger.debug("Request to declare exchange of type " + body.type + " with name " + body.exchange);
+            _logger.debug("Request to declare exchange of type " + body.getType() + " with name " + body.getExchange());
         }
         synchronized(exchangeRegistry)
         {
-            Exchange exchange = exchangeRegistry.getExchange(body.exchange);
+            Exchange exchange = exchangeRegistry.getExchange(body.getExchange());
 
 
 
             if (exchange == null)
             {
-                if(body.passive && ((body.type == null) || body.type.length() ==0))
+                if(body.getPassive() && ((body.getType() == null) || body.getType().length() ==0))
                 {
-                    throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + body.exchange);                    
+                    throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + body.getExchange());
                 }
                 else
                 {
                     try
                     {
 
-                    exchange = exchangeFactory.createExchange(body.exchange, body.type, body.durable,
-                                                              body.passive, body.ticket);
+                    exchange = exchangeFactory.createExchange(body.getExchange(), body.getType(), body.getDurable(),
+                                                              body.getPassive(), body.getTicket());
                     exchangeRegistry.registerExchange(exchange);
                     }
                     catch(AMQUnknownExchangeType e)
                     {
-                        throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + body.exchange,e);
+                        throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + body.getExchange(),e);
                     }
                 }
             }
-            else if (!exchange.getType().equals(body.type))
+            else if (!exchange.getType().equals(body.getType()))
             {
 
-                throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + body.exchange + " of type " + exchange.getType() + " to " + body.type +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor());    
+                throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + body.getExchange() + " of type " + exchange.getType() + " to " + body.getType() +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor());
             }
 
         }
-        if(!body.nowait)
+        if(!body.getNowait())
         {
             // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
             // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.

Modified: incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java Thu Mar 22 06:14:42 2007
@@ -54,7 +54,7 @@
         ExchangeDeleteBody body = evt.getMethod();
         try
         {
-            exchangeRegistry.unregisterExchange(body.exchange, body.ifUnused);
+            exchangeRegistry.unregisterExchange(body.getExchange(), body.getIfUnused());
             // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
             // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
             // Be aware of possible changes to parameter order as versions change.

Modified: incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java Thu Mar 22 06:14:42 2007
@@ -26,6 +26,7 @@
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.QueueBindBody;
 import org.apache.qpid.framing.QueueBindOkBody;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.exchange.Exchange;
@@ -61,8 +62,10 @@
         QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
 
         final QueueBindBody body = evt.getMethod();
+
+        AMQShortString routingKey = body.getRoutingKey();
         final AMQQueue queue;
-        if (body.queue == null)
+        if (body.getQueue() == null)
         {
             AMQChannel channel = session.getChannel(evt.getChannelId());
 
@@ -77,33 +80,35 @@
             {
                 throw body.getChannelException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null");
             }
-            
-            if (body.routingKey == null)
+
+
+
+            if (routingKey == null)
             {
-                body.routingKey = queue.getName();
+                routingKey = queue.getName();
             }
         }
         else
         {
-            queue = queueRegistry.getQueue(body.queue);
+            queue = queueRegistry.getQueue(body.getQueue());
         }
 
         if (queue == null)
         {
-            throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.queue + " does not exist.");
+            throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
         }
-        final Exchange exch = exchangeRegistry.getExchange(body.exchange);
+        final Exchange exch = exchangeRegistry.getExchange(body.getExchange());
         if (exch == null)
         {
-            throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.exchange + " does not exist.");
+            throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist.");
         }
         try
         {            
-            queue.bind(body.routingKey, body.arguments, exch);
+            queue.bind(routingKey, body.getArguments(), exch);
         }
         catch (AMQInvalidRoutingKeyException rke)
         {
-            throw body.getChannelException(AMQConstant.INVALID_ROUTING_KEY, body.routingKey.toString());
+            throw body.getChannelException(AMQConstant.INVALID_ROUTING_KEY, routingKey.toString());
         }
         catch (AMQException e)
         {
@@ -112,9 +117,9 @@
 
         if (_log.isInfoEnabled())
         {
-            _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + body.routingKey);
+            _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey);
         }
-        if (!body.nowait)
+        if (!body.getNowait())
         {
             // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
             // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.

Modified: incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Thu Mar 22 06:14:42 2007
@@ -78,10 +78,12 @@
 
         QueueDeclareBody body = evt.getMethod();
 
+        AMQShortString queueName = body.getQueue();
+
         // if we aren't given a queue name, we create one which we return to the client
-        if (body.queue == null)
+        if (body.getQueue() == null)
         {
-            body.queue = createName();
+            queueName = createName();
         }
 
         AMQQueue queue;
@@ -90,11 +92,11 @@
         synchronized (queueRegistry)
         {
 
-            if (((queue = queueRegistry.getQueue(body.queue)) == null) )
+            if (((queue = queueRegistry.getQueue(queueName)) == null) )
             {
-                if(body.passive)
+                if(body.getPassive())
                 {
-                    String msg = "Queue: " + body.queue + " not found.";
+                    String msg = "Queue: " + queueName + " not found.";
                     throw body.getChannelException(AMQConstant.NOT_FOUND,msg );
                 }
                 else
@@ -109,8 +111,8 @@
                     {
                         Exchange defaultExchange = exchangeRegistry.getDefaultExchange();
 
-                        queue.bind(body.queue, null, defaultExchange);
-                        _log.info("Queue " + body.queue + " bound to default exchange");
+                        queue.bind(queueName, null, defaultExchange);
+                        _log.info("Queue " + body.getQueue() + " bound to default exchange");
                     }
                 }
             }
@@ -130,7 +132,7 @@
             channel.setDefaultQueue(queue);
         }
 
-        if (!body.nowait)
+        if (!body.getNowait())
         {
             // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
             // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
@@ -139,8 +141,8 @@
                 (byte)8, (byte)0,	// AMQP version (major, minor)
                 queue.getConsumerCount(), // consumerCount
                 queue.getMessageCount(), // messageCount
-                body.queue); // queue
-            _log.info("Queue " + body.queue + " declared successfully");
+                queueName); // queue
+            _log.info("Queue " + queueName + " declared successfully");
             session.writeFrame(response);
         }
     }
@@ -159,11 +161,11 @@
             throws AMQException
     {
         final QueueRegistry registry = virtualHost.getQueueRegistry();
-        AMQShortString owner = body.exclusive ? session.getContextKey() : null;
-        final AMQQueue queue =  new AMQQueue(body.queue, body.durable, owner, body.autoDelete, virtualHost);
+        AMQShortString owner = body.getExclusive() ? session.getContextKey() : null;
+        final AMQQueue queue =  new AMQQueue(body.getQueue(), body.getDurable(), owner, body.getAutoDelete(), virtualHost);
         final AMQShortString queueName = queue.getName();
 
-        if(body.exclusive && !body.durable)
+        if(body.getExclusive() && !body.getDurable())
         {
             final AMQProtocolSession.Task deleteQueueTask =
                 new AMQProtocolSession.Task()

Modified: incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java Thu Mar 22 06:14:42 2007
@@ -65,7 +65,7 @@
 
         QueueDeleteBody body = evt.getMethod();
         AMQQueue queue;
-        if(body.queue == null)
+        if(body.getQueue() == null)
         {
              AMQChannel channel = session.getChannel(evt.getChannelId());
 
@@ -79,31 +79,31 @@
         }
         else
         {
-            queue = queueRegistry.getQueue(body.queue);
+            queue = queueRegistry.getQueue(body.getQueue());
         }
 
         if(queue == null)
         {
             if(_failIfNotFound)
             {
-                throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.queue + " does not exist.");
+                throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
             }
         }
         else
         {
-            if(body.ifEmpty && !queue.isEmpty())
+            if(body.getIfEmpty() && !queue.isEmpty())
             {
-                throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.queue + " is not empty." );
+                throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is not empty." );
             }
-            else if(body.ifUnused && !queue.isUnused())
+            else if(body.getIfUnused() && !queue.isUnused())
             {                
                 // TODO - Error code
-                throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.queue + " is still used." );
+                throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is still used." );
 
             }
             else
             {
-                int purged = queue.delete(body.ifUnused, body.ifEmpty);
+                int purged = queue.delete(body.getIfUnused(), body.getIfEmpty());
                 store.removeQueue(queue.getName());
                 // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
                 // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.

Modified: incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java Thu Mar 22 06:14:42 2007
@@ -44,7 +44,7 @@
 
         QueuePurgeBody body = evt.getMethod();
         AMQQueue queue;
-        if(body.queue == null)
+        if(body.getQueue() == null)
         {
 
            if (channel == null)
@@ -65,14 +65,14 @@
         }
         else
         {
-            queue = queueRegistry.getQueue(body.queue);
+            queue = queueRegistry.getQueue(body.getQueue());
         }
 
         if(queue == null)
         {
             if(_failIfNotFound)
             {
-                throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.queue + " does not exist.");
+                throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
             }
         }
         else
@@ -80,7 +80,7 @@
                 long purged = queue.clearQueue(channel.getStoreContext());
 
 
-                if(!body.nowait)
+                if(!body.getNowait())
                 {
                     // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
                     // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.

Modified: incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java Thu Mar 22 06:14:42 2007
@@ -23,6 +23,8 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.TxRollbackBody;
 import org.apache.qpid.framing.TxRollbackOkBody;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.amqp_8_0.TxRollbackBodyImpl;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -56,10 +58,10 @@
             }
 
             channel.rollback();
-            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-            // Be aware of possible changes to parameter order as versions change.
-            session.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0));
+
+            TxRollbackBody rollbackBody = createTxRollbackBody();
+            session.writeFrame(new AMQFrame(evt.getChannelId(), rollbackBody));
+
             //Now resend all the unacknowledged messages back to the original subscribers.
             //(Must be done after the TxnRollback-ok response).
             // Why, are we not allowed to send messages back to client before the ok method?
@@ -69,5 +71,10 @@
         {
             throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to rollback: " + e.getMessage());
         }
+    }
+
+    private TxRollbackBody createTxRollbackBody()
+    {
+        return new TxRollbackBodyImpl();
     }
 }

Modified: incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java Thu Mar 22 06:14:42 2007
@@ -23,6 +23,8 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.TxSelectBody;
 import org.apache.qpid.framing.TxSelectOkBody;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.amqp_8_0.TxSelectBodyImpl;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.state.AMQStateManager;
@@ -55,9 +57,12 @@
 
         channel.setLocalTransactional();
 
-        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-        // Be aware of possible changes to parameter order as versions change.
-        session.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0));
+        TxSelectBody txSelect = createTxSelectBody();
+        session.writeFrame(new AMQFrame(evt.getChannelId(), txSelect));
+    }
+
+    private TxSelectBody createTxSelectBody()
+    {
+        return new TxSelectBodyImpl();
     }
 }

Copied: incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (from r511389, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=521253&p1=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java&r1=511389&p2=incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java&r2=521253
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Thu Mar 22 06:14:42 2007
@@ -44,6 +44,7 @@
 import org.apache.qpid.codec.AMQDecoder;
 import org.apache.qpid.common.ClientProperties;
 import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_8_0.ConnectionStartBodyImpl;
 import org.apache.qpid.pool.ReadWriteThreadModel;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
@@ -210,9 +211,9 @@
             _logger.debug("Frame Received: " + frame);
         }
 
-        if (body instanceof AMQMethodBody)
+        if (body instanceof AMQMethodBodyImpl)
         {
-            methodFrameReceived(channelId, (AMQMethodBody)body);
+            methodFrameReceived(channelId, (AMQMethodBodyImpl)body);
         }
         else if (body instanceof ContentHeaderBody)
         {
@@ -247,14 +248,13 @@
 
             String locales = "en_US";
 
+            ConnectionStartBody connectionStartBody = createConnectionStartBody(pi,
+                                                                                locales.getBytes(),
+                                                                                mechanisms.getBytes(),
+                                                                                null);
+
             // Interfacing with generated code - be aware of possible changes to parameter order as versions change.
-            AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0,
-                                                                   getProtocolMajorVersion(), getProtocolMinorVersion(),    // AMQP version (major, minor)
-                                                                   locales.getBytes(),    // locales
-                                                                   mechanisms.getBytes(),    // mechanisms
-                                                                   null,    // serverProperties
-                                                                   (short) getProtocolMajorVersion(),    // versionMajor
-                                                                   (short) getProtocolMinorVersion());    // versionMinor
+            AMQFrame response = new AMQFrame((short) 0,connectionStartBody);    // versionMinor
             _minaProtocolSession.write(response);
         }
         catch (AMQException e)
@@ -272,11 +272,19 @@
         }
     }
 
+    private ConnectionStartBody createConnectionStartBody(ProtocolInitiation pi,
+                                                          byte[] locales,
+                                                          byte[] mechanisms,
+                                                          FieldTable serverProperties)
+    {
+        return new ConnectionStartBodyImpl(pi._protocolMajor, pi._protocolMinor,serverProperties,mechanisms,locales);
+    }
+
 
-    private void methodFrameReceived(int channelId, AMQMethodBody methodBody)
+    private void methodFrameReceived(int channelId, AMQMethodBodyImpl methodBody)
     {
 
-        final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId,
+        final AMQMethodEvent<AMQMethodBodyImpl> evt = new AMQMethodEvent<AMQMethodBodyImpl>(channelId,
                                                                                     methodBody);
 
         //Check that this channel is not closing
@@ -736,4 +744,8 @@
     }
 
 
+    public ProtocolVersion getProtocolVersion()
+    {
+        return _protocolVersion;
+    }
 }

Copied: incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java (from r511389, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java?view=diff&rev=521253&p1=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java&r1=511389&p2=incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java&r2=521253
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java (original)
+++ incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java Thu Mar 22 06:14:42 2007
@@ -20,14 +20,14 @@
  */
 package org.apache.qpid.server.protocol;
 
-import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.AMQException;
 
 public class AMQNoMethodHandlerException extends AMQException
 {
 
-    public AMQNoMethodHandlerException(AMQMethodEvent<AMQMethodBody> evt)
+    public AMQNoMethodHandlerException(AMQMethodEvent<AMQMethodBodyImpl> evt)
     {
         super("AMQMethodEvent " + evt + " was not processed by any listener on Broker.");
     }

Copied: incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java (from r511389, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java?view=diff&rev=521253&p1=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java&r1=511389&p2=incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java&r2=521253
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java (original)
+++ incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java Thu Mar 22 06:14:42 2007
@@ -40,6 +40,7 @@
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.amqp_8_0.ConnectionCloseBodyImpl;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.management.AMQManagedObject;
@@ -218,18 +219,11 @@
      */
     public void closeConnection() throws JMException
     {        
-        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-        // Be aware of possible changes to parameter order as versions change.
-        final AMQFrame response = ConnectionCloseBody.createAMQFrame(0,
-            _session.getProtocolMajorVersion(),
-            _session.getProtocolMinorVersion(),	// AMQP version (major, minor)
-            0,	// classId
-            0,	// methodId
-        	AMQConstant.REPLY_SUCCESS.getCode(),	// replyCode
+        ConnectionCloseBody connectionCloseBody =
+                createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(),	// replyCode
             BROKER_MANAGEMENT_CONSOLE_HAS_CLOSED_THE_CONNECTION    // replyText
             );
-        _session.writeFrame(response);
+        _session.writeFrame(new AMQFrame(0,connectionCloseBody));
 
         try
         {
@@ -239,6 +233,11 @@
         {
             throw new MBeanException(ex, ex.toString());
         }
+    }
+
+    private ConnectionCloseBody createConnectionCloseBody(int code, AMQShortString replyText)
+    {
+        return new ConnectionCloseBodyImpl(code,replyText,0,0);
     }
 
     @Override

Copied: incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (from r511389, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=521253&p1=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java&r1=511389&p2=incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java&r2=521253
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Thu Mar 22 06:14:42 2007
@@ -21,7 +21,7 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQBodyImpl;
 import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.ContentHeaderBody;
@@ -125,7 +125,7 @@
             try
             {
 
-                AMQBody cb = getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(), _messageId, ++_index));
+                AMQBodyImpl cb = getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(), _messageId, ++_index));
                 return new AMQFrame(_channel, cb);
             }
             catch (AMQException e)

Modified: incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java (original)
+++ incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java Thu Mar 22 06:14:42 2007
@@ -27,35 +27,7 @@
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.BasicAckBody;
-import org.apache.qpid.framing.BasicCancelBody;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.BasicGetBody;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.BasicQosBody;
-import org.apache.qpid.framing.BasicRecoverBody;
-import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.framing.ChannelFlowBody;
-import org.apache.qpid.framing.ChannelOpenBody;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.ConnectionCloseOkBody;
-import org.apache.qpid.framing.ConnectionOpenBody;
-import org.apache.qpid.framing.ConnectionSecureOkBody;
-import org.apache.qpid.framing.ConnectionStartOkBody;
-import org.apache.qpid.framing.ConnectionTuneOkBody;
-import org.apache.qpid.framing.ExchangeBoundBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
-import org.apache.qpid.framing.ExchangeDeleteBody;
-import org.apache.qpid.framing.QueueBindBody;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.QueueDeleteBody;
-import org.apache.qpid.framing.QueuePurgeBody;
-import org.apache.qpid.framing.TxCommitBody;
-import org.apache.qpid.framing.TxRollbackBody;
-import org.apache.qpid.framing.TxSelectBody;
-import org.apache.qpid.framing.BasicRejectBody;
+import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.server.handler.BasicAckMethodHandler;

Modified: incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java (original)
+++ incubator/qpid/branches/java.multi_version/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java Thu Mar 22 06:14:42 2007
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.state;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
 import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
 

Modified: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Thu Mar 22 06:14:42 2007
@@ -25,7 +25,8 @@
 import org.apache.qpid.AMQUndeliveredException;
 import org.apache.qpid.AMQUnresolvedAddressException;
 import org.apache.qpid.client.failover.FailoverSupport;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.protocol.AMQProtocolHandlerImpl;
+import org.apache.qpid.client.protocol.ProtocolOutputHandler;
 import org.apache.qpid.client.state.AMQState;
 import org.apache.qpid.client.transport.TransportConnection;
 import org.apache.qpid.exchange.ExchangeDefaults;
@@ -36,6 +37,8 @@
 import org.apache.qpid.framing.ChannelOpenOkBody;
 import org.apache.qpid.framing.TxSelectBody;
 import org.apache.qpid.framing.TxSelectOkBody;
+import org.apache.qpid.framing.AMQMethodFactory;
+import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.jms.BrokerDetails;
 import org.apache.qpid.jms.ChannelLimitReachedException;
 import org.apache.qpid.jms.Connection;
@@ -92,7 +95,7 @@
      * the handler deals with this. It also deals with the initial dispatch of any protocol frames to their appropriate
      * handler.
      */
-    private AMQProtocolHandler _protocolHandler;
+    private AMQProtocolHandlerImpl _protocolHandler;
 
     /** Maps from session id (Integer) to AMQSession instance */
     private final Map _sessions = new LinkedHashMap(); //fixme this is map is replicated in amqprotocolsession as _channelId2SessionMap    
@@ -273,7 +276,7 @@
 
         _failoverPolicy = new FailoverPolicy(connectionURL);
 
-        _protocolHandler = new AMQProtocolHandler(this);
+        _protocolHandler = new AMQProtocolHandlerImpl(this);
 
         // We are not currently connected
         _connected = false;
@@ -550,26 +553,15 @@
     private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
             throws AMQException
     {
+        // define this here, should be poassed in
+        final int prefetchSize = 0;
 
-        // TODO: Be aware of possible changes to parameter order as versions change.
+        AMQMethodFactory methodFactory = getAMQMethodFactory();
 
-        _protocolHandler.syncWrite(
-                ChannelOpenBody.createAMQFrame(channelId,
-                                               _protocolHandler.getProtocolMajorVersion(),
-                                               _protocolHandler.getProtocolMinorVersion(),
-                                               null),    // outOfBand
-                                                         ChannelOpenOkBody.class);
-
-        //todo send low water mark when protocol allows.
-        //todo Be aware of possible changes to parameter order as versions change.
-        _protocolHandler.syncWrite(
-                BasicQosBody.createAMQFrame(channelId,
-                                            _protocolHandler.getProtocolMajorVersion(),
-                                            _protocolHandler.getProtocolMinorVersion(),
-                                            false,    // global
-                                            prefetchHigh,    // prefetchCount
-                                            0),    // prefetchSize
-                                                   BasicQosOkBody.class);
+        ChannelOpenBody openBody = methodFactory.createChannelOpen();
+        sendCommandReceiveResponse(channelId, openBody);
+        AMQMethodBody qosBody = methodFactory.createMessageQos(prefetchHigh, prefetchSize);
+        sendCommandReceiveResponse(channelId, qosBody);
 
         if (transacted)
         {
@@ -578,14 +570,21 @@
                 _logger.debug("Issuing TxSelect for " + channelId);
             }
 
-            // TODO: Be aware of possible changes to parameter order as versions change.
-            _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId,
-                                                                   _protocolHandler.getProtocolMajorVersion(),
-                                                                   _protocolHandler.getProtocolMinorVersion()),
-                                       TxSelectOkBody.class);
+            TxSelectBody txSelectBody = methodFactory.createTxSelect();
+            sendCommandReceiveResponse(channelId, txSelectBody);
         }
     }
 
+    private AMQMethodBody sendCommandReceiveResponse(int channelId, AMQMethodBody command) throws AMQException
+    {
+        return getProtocolOutputHandler().sendCommandReceiveResponse(channelId, command);
+    }
+
+    private AMQMethodFactory getAMQMethodFactory()
+    {
+        return getProtocolOutputHandler().getAMQMethodFactory();
+    }
+
     private void reopenChannel(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) throws AMQException
     {
         try
@@ -934,7 +933,7 @@
         return _virtualHost;
     }
 
-    public AMQProtocolHandler getProtocolHandler()
+    public AMQProtocolHandlerImpl getProtocolHandler()
     {
         return _protocolHandler;
     }
@@ -1217,5 +1216,10 @@
     public void performConnectionTask(Runnable task)
     {
         _taskPool.execute(task);
+    }
+
+    public ProtocolOutputHandler getProtocolOutputHandler()
+    {
+        return _protocolHandler.getOutputHandler();
     }
 }

Modified: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Thu Mar 22 06:14:42 2007
@@ -59,6 +59,7 @@
 import org.apache.qpid.AMQUndeliveredException;
 import org.apache.qpid.AMQInvalidRoutingKeyException;
 import org.apache.qpid.AMQInvalidArgumentException;
+import org.apache.qpid.AMQTimeoutException;
 import org.apache.qpid.client.failover.FailoverSupport;
 import org.apache.qpid.client.message.AbstractJMSMessage;
 import org.apache.qpid.client.message.JMSBytesMessage;
@@ -68,41 +69,12 @@
 import org.apache.qpid.client.message.JMSTextMessage;
 import org.apache.qpid.client.message.MessageFactoryRegistry;
 import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.protocol.AMQProtocolHandlerImpl;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.client.protocol.BlockingMethodFrameListener;
+import org.apache.qpid.client.protocol.ProtocolOutputHandler;
 import org.apache.qpid.client.util.FlowControllingBlockingQueue;
 import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.AccessRequestBody;
-import org.apache.qpid.framing.AccessRequestOkBody;
-import org.apache.qpid.framing.BasicAckBody;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.BasicConsumeOkBody;
-import org.apache.qpid.framing.BasicRecoverBody;
-import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.framing.ChannelFlowBody;
-import org.apache.qpid.framing.ExchangeBoundBody;
-import org.apache.qpid.framing.ExchangeBoundOkBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
-import org.apache.qpid.framing.ExchangeDeclareOkBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.FieldTableFactory;
-import org.apache.qpid.framing.QueueBindBody;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.QueueDeleteBody;
-import org.apache.qpid.framing.QueueDeleteOkBody;
-import org.apache.qpid.framing.TxCommitBody;
-import org.apache.qpid.framing.TxCommitOkBody;
-import org.apache.qpid.framing.TxRollbackBody;
-import org.apache.qpid.framing.TxRollbackOkBody;
-import org.apache.qpid.framing.QueueBindOkBody;
-import org.apache.qpid.framing.QueueDeclareOkBody;
-import org.apache.qpid.framing.ChannelFlowOkBody;
-import org.apache.qpid.framing.BasicRecoverOkBody;
-import org.apache.qpid.framing.BasicRejectBody;
+import org.apache.qpid.framing.*;
 import org.apache.qpid.jms.Session;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
@@ -197,6 +169,7 @@
     private boolean _suspended;
 
     private final Object _suspensionLock = new Object();
+    private static final AMQShortString CHANNEL_CLOSE_REPLY_TEXT = new AMQShortString("JMS client closing channel");
 
 
     /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
@@ -271,11 +244,11 @@
         {
             if (message.getDeliverBody() != null)
             {
-                final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.getDeliverBody().consumerTag);
+                final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.getDeliverBody().getConsumerTag());
 
                 if (consumer == null)
                 {
-                    _logger.warn("Received a message from queue " + message.getDeliverBody().consumerTag + " without a handler - ignoring...");
+                    _logger.warn("Received a message from queue " + message.getDeliverBody().getConsumerTag() + " without a handler - ignoring...");
                     _logger.warn("Consumers that exist: " + _consumers);
                     _logger.warn("Session hashcode: " + System.identityHashCode(this));
                 }
@@ -513,14 +486,11 @@
                 i.next().acknowledgeLastDelivered();
             }
 
-            // Commits outstanding messages sent and outstanding acknowledgements.
-            // TODO: Be aware of possible changes to parameter order as versions change.
-            final AMQProtocolHandler handler = getProtocolHandler();
-
-            handler.syncWrite(TxCommitBody.createAMQFrame(_channelId,
-                                                          getProtocolMajorVersion(),
-                                                          getProtocolMinorVersion()),
-                              TxCommitOkBody.class);
+            TxCommitBody commitBody = getAMQMethodFactory().createTxCommit();
+            sendCommandReceiveResponse(commitBody);
+
+
+
         }
         catch (AMQException e)
         {
@@ -549,8 +519,8 @@
                     suspendChannel(true);
                 }
 
-                _connection.getProtocolHandler().syncWrite(
-                        TxRollbackBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class);
+                TxRollbackBody rollbackBody = getAMQMethodFactory().createTxRollback();
+                sendCommandReceiveResponse(rollbackBody);
 
                 if (_dispatcher != null)
                 {
@@ -590,15 +560,12 @@
                 {
 
                     getProtocolHandler().closeSession(this);
-                    // TODO: Be aware of possible changes to parameter order as versions change.
-                    final AMQFrame frame = ChannelCloseBody.createAMQFrame(getChannelId(),
-                                                                           getProtocolMajorVersion(), getProtocolMinorVersion(),    // AMQP version (major, minor)
-                                                                           0,    // classId
-                                                                           0,    // methodId
-                                                                           AMQConstant.REPLY_SUCCESS.getCode(),    // replyCode
-                                                                           new AMQShortString("JMS client closing channel"));    // replyText
+                    ChannelCloseBody closeBody = getAMQMethodFactory().createChannelClose(AMQConstant.REPLY_SUCCESS.getCode(),    // replyCode
+                                                                                          CHANNEL_CLOSE_REPLY_TEXT);
+                    sendCommandReceiveResponse(closeBody, timeout);
+
+
 
-                    getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout);
                     // When control resumes at this point, a reply will have been received that
                     // indicates the broker has closed the channel successfully
 
@@ -617,21 +584,17 @@
         }
     }
 
-    private AMQProtocolHandler getProtocolHandler()
+    private AMQProtocolHandlerImpl getProtocolHandler()
     {
         return _connection.getProtocolHandler();
     }
 
-
-    private byte getProtocolMinorVersion()
+    public ProtocolOutputHandler getProtocolOutputHandler()
     {
-        return getProtocolHandler().getProtocolMinorVersion();
+        return _connection.getProtocolOutputHandler();
     }
 
-    private byte getProtocolMajorVersion()
-    {
-        return getProtocolHandler().getProtocolMajorVersion();
-    }
+
 
 
     /**
@@ -835,14 +798,12 @@
                 consumer.clearUnackedMessages();
             }
 
-            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-            // Be aware of possible changes to parameter order as versions change.
-            _connection.getProtocolHandler().syncWrite(BasicRecoverBody.createAMQFrame(_channelId,
-                                                                                       getProtocolMajorVersion(),
-                                                                                       getProtocolMinorVersion(),
-                                                                                       false)    // requeue
-                    , BasicRecoverOkBody.class);
+            final boolean requeue = false;
+            AMQMethodBody recoverBody = getAMQMethodFactory().createRecover(requeue);
+            sendCommandReceiveResponse(recoverBody);
+
+
+
 
             if (_dispatcher != null)
             {
@@ -1226,136 +1187,60 @@
 
     public void declareExchange(AMQShortString name, AMQShortString type) throws AMQException
     {
-        declareExchange(name, type, getProtocolHandler());
+        ExchangeDeclareBody exchangeDeclare = getAMQMethodFactory().createExchangeDeclare(name,type,getTicket());
+        sendCommandReceiveResponse(exchangeDeclare);
     }
 
     public void declareExchangeSynch(AMQShortString name, AMQShortString type) throws AMQException
     {
-        // TODO: Be aware of possible changes to parameter order as versions change.
-        AMQFrame frame = ExchangeDeclareBody.createAMQFrame(_channelId,
-                                                            getProtocolMajorVersion(), getProtocolMinorVersion(),    // AMQP version (major, minor)
-                                                            null,    // arguments
-                                                            false,    // autoDelete
-                                                            false,    // durable
-                                                            name,    // exchange
-                                                            false,    // internal
-                                                            false,    // nowait
-                                                            false,    // passive
-                                                            getTicket(),    // ticket
-                                                            type);    // type
-        getProtocolHandler().syncWrite(frame, ExchangeDeclareOkBody.class);
-    }
-
-    private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException
-    {
-        declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler);
-    }
-
-    private void declareExchange(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler) throws AMQException
-    {
-        // TODO: Be aware of possible changes to parameter order as versions change.
-        AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId,
-                                                                      getProtocolMajorVersion(), getProtocolMinorVersion(),    // AMQP version (major, minor)
-                                                                      null,    // arguments
-                                                                      false,    // autoDelete
-                                                                      false,    // durable
-                                                                      name,    // exchange
-                                                                      false,    // internal
-                                                                      false,    // nowait
-                                                                      false,    // passive
-                                                                      getTicket(),    // ticket
-                                                                      type);    // type
+        ExchangeDeclareBody exchangeDeclare = getAMQMethodFactory().createExchangeDeclare(name,type,getTicket());
+        sendCommandReceiveResponse(exchangeDeclare);
 
-        protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
     }
 
-
     public void createQueue(AMQShortString name, boolean autoDelete, boolean durable, boolean exclusive) throws AMQException
     {
-        AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId,
-                                                                getProtocolMajorVersion(), getProtocolMinorVersion(),    // AMQP version (major, minor)
-                                                                null,    // arguments
-                                                                autoDelete,    // autoDelete
-                                                                durable,    // durable
-                                                                exclusive,    // exclusive
-                                                                false,    // nowait
-                                                                false,    // passive
-                                                                name,    // queue
-                                                                getTicket());    // ticket
-
-        getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
-
+        QueueDeclareBody queueDeclare = getAMQMethodFactory().createQueueDeclare(name, null, autoDelete, durable, exclusive, false ,getTicket());
+        sendCommandReceiveResponse(queueDeclare);
     }
 
 
     public void bindQueue(AMQShortString queueName, AMQShortString routingKey, FieldTable arguments, AMQShortString exchangeName) throws AMQException
     {
-        // TODO: Be aware of possible changes to parameter order as versions change.
-        AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId,
-                                                          getProtocolMajorVersion(), getProtocolMinorVersion(),    // AMQP version (major, minor)
-                                                          arguments,    // arguments
-                                                          exchangeName,    // exchange
-                                                          false,    // nowait
-                                                          queueName,    // queue
-                                                          routingKey,    // routingKey
-                                                          getTicket());    // ticket
-
-
-        getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class);
+        QueueBindBody queueBind = getAMQMethodFactory().createQueueBind(queueName,exchangeName,routingKey,arguments,getTicket());
+        sendCommandReceiveResponse(queueBind);
     }
 
     /**
      * Declare the queue.
      *
      * @param amqd
-     * @param protocolHandler
      *
      * @return the queue name. This is useful where the broker is generating a queue name on behalf of the client.
      *
      * @throws AMQException
      */
-    private AMQShortString declareQueue(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException
+    private AMQShortString declareQueue(AMQDestination amqd) throws AMQException
     {
         // For queues (but not topics) we generate the name in the client rather than the
         // server. This allows the name to be reused on failover if required. In general,
         // the destination indicates whether it wants a name generated or not.
         if (amqd.isNameRequired())
         {
-            amqd.setQueueName(protocolHandler.generateQueueName());
+            amqd.setQueueName(getProtocolHandler().generateQueueName());
         }
 
         //TODO verify the destiation is valid. else throw 
 
-        // TODO: Be aware of possible changes to parameter order as versions change.
-        AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId,
-                                                                getProtocolMajorVersion(), getProtocolMinorVersion(),    // AMQP version (major, minor)
-                                                                null,    // arguments
-                                                                amqd.isAutoDelete(),    // autoDelete
-                                                                amqd.isDurable(),    // durable
-                                                                amqd.isExclusive(),    // exclusive
-                                                                false,    // nowait
-                                                                false,    // passive
-                                                                amqd.getAMQQueueName(),    // queue
-                                                                getTicket());    // ticket
+        createQueue(amqd.getAMQQueueName(),amqd.isAutoDelete(),amqd.isDurable(),amqd.isExclusive());
+
 
-        protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class);
         return amqd.getAMQQueueName();
     }
 
-    private void bindQueue(AMQDestination amqd, AMQShortString queueName, AMQProtocolHandler protocolHandler, FieldTable ft) throws AMQException
+    private void bindQueue(AMQDestination amqd, AMQShortString queueName, FieldTable ft) throws AMQException
     {
-        // TODO: Be aware of possible changes to parameter order as versions change.
-        AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId,
-                                                          getProtocolMajorVersion(), getProtocolMinorVersion(),    // AMQP version (major, minor)
-                                                          ft,    // arguments
-                                                          amqd.getExchangeName(),    // exchange
-                                                          false,    // nowait
-                                                          queueName,    // queue
-                                                          amqd.getRoutingKey(),    // routingKey
-                                                          getTicket());    // ticket
-
-
-        protocolHandler.syncWrite(queueBind, QueueBindOkBody.class);
+        bindQueue(queueName,amqd.getRoutingKey(),ft,amqd.getExchangeName());
     }
 
     /**
@@ -1365,8 +1250,7 @@
      *
      * @return the consumer tag generated by the broker
      */
-    private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler,
-                                  boolean nowait, String messageSelector) throws AMQException
+    private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName, boolean nowait, String messageSelector) throws AMQException
     {
         //fixme prefetch values are not used here. Do we need to have them as parametsrs?
         //need to generate a consumer tag on the client so we can exploit the nowait flag
@@ -1392,25 +1276,24 @@
 
         try
         {
-            // TODO: Be aware of possible changes to parameter order as versions change.
-            AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId,
-                                                                  getProtocolMajorVersion(), getProtocolMinorVersion(),    // AMQP version (major, minor)
-                                                                  arguments,    // arguments
-                                                                  tag,    // consumerTag
-                                                                  consumer.isExclusive(),    // exclusive
-                                                                  consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,    // noAck
-                                                                  consumer.isNoLocal(),    // noLocal
-                                                                  nowait,    // nowait
-                                                                  queueName,    // queue
-                                                                  getTicket());    // ticket
-            if (nowait)
+            final boolean noAck = consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE;
+
+            AMQMethodBody consumeBody = getAMQMethodFactory().createConsumer(tag,
+                                                 queueName,
+                                                 arguments,
+                                                 noAck,
+                                                 consumer.isExclusive(),
+                                                 consumer.isNoLocal(),
+                                                 getTicket());
+            if(nowait)
             {
-                protocolHandler.writeFrame(jmsConsume);
+                sendCommand(consumeBody);
             }
             else
             {
-                protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class);
+                sendCommandReceiveResponse(consumeBody);
             }
+
         }
         catch (AMQException e)
         {
@@ -1606,15 +1489,9 @@
     {
         try
         {
-            // TODO: Be aware of possible changes to parameter order as versions change.
-            AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId,
-                                                                       getProtocolMajorVersion(), getProtocolMinorVersion(),    // AMQP version (major, minor)
-                                                                       false,    // ifEmpty
-                                                                       false,    // ifUnused
-                                                                       true,    // nowait
-                                                                       queueName,    // queue
-                                                                       getTicket());    // ticket
-            getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class);
+            
+            QueueDeleteBody deleteBody = getAMQMethodFactory().createQueueDelete(queueName, false, false, getTicket());
+            sendCommandReceiveResponse(deleteBody);
         }
         catch (AMQException e)
         {
@@ -1697,23 +1574,23 @@
 
     boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey) throws JMSException
     {
-        // TODO: Be aware of possible changes to parameter order as versions change.
-        AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId,
-                                                               getProtocolMajorVersion(), getProtocolMinorVersion(),    // AMQP version (major, minor)
-                                                               exchangeName,    // exchange
-                                                               queueName,    // queue
-                                                               routingKey);    // routingKey
         AMQMethodEvent response = null;
         try
         {
-            response = getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
+            ExchangeBoundBody exchangeBoundBody =
+                getAMQMethodFactory().createExchangeBound(exchangeName,    // exchange
+                                                          queueName,    // queue
+                                                          routingKey);    // routingKey
+
+
+            ExchangeBoundOkBody responseBody = sendCommandReceiveResponse(exchangeBoundBody, ExchangeBoundOkBody.class);
+            return (responseBody.getReplyCode() == 0);
         }
         catch (AMQException e)
         {
             throw new JMSAMQException(e);
         }
-        ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod();
-        return (responseBody.replyCode == 0); //ExchangeBoundHandler.OK); Remove Broker compile dependency
+
     }
 
     private void checkTransacted() throws JMSException
@@ -1770,13 +1647,13 @@
                             // Bounced message is processed here, away from the mina thread
                             AbstractJMSMessage bouncedMessage = _messageFactoryRegistry.createMessage(0,
                                                                                                       false,
-                                                                                                      message.getBounceBody().exchange,
-                                                                                                      message.getBounceBody().routingKey,
+                                                                                                      message.getBounceBody().getExchange(),
+                                                                                                      message.getBounceBody().getRoutingKey(),
                                                                                                       message.getContentHeader(),
                                                                                                       message.getBodies());
 
-                            AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode);
-                            AMQShortString reason = message.getBounceBody().replyText;
+                            AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().getReplyCode());
+                            AMQShortString reason = message.getBounceBody().getReplyText();
                             _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
 
                             //@TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
@@ -1812,16 +1689,12 @@
      */
     public void acknowledgeMessage(long deliveryTag, boolean multiple)
     {
-        // TODO: Be aware of possible changes to parameter order as versions change.
-        final AMQFrame ackFrame = BasicAckBody.createAMQFrame(_channelId,
-                                                              getProtocolMajorVersion(), getProtocolMinorVersion(),    // AMQP version (major, minor)
-                                                              deliveryTag,    // deliveryTag
-                                                              multiple);    // multiple
+        AMQMethodBody ackBody = getAMQMethodFactory().createAcknowledge(deliveryTag, multiple);
         if (_logger.isDebugEnabled())
         {
             _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);
         }
-        getProtocolHandler().writeFrame(ackFrame);
+        sendCommand(ackBody);
     }
 
     public int getDefaultPrefetch()
@@ -1908,17 +1781,15 @@
     {
         AMQDestination amqd = consumer.getDestination();
 
-        AMQProtocolHandler protocolHandler = getProtocolHandler();
-
-        declareExchange(amqd, protocolHandler);
+        declareExchange(amqd.getExchangeName(), amqd.getExchangeClass());
 
-        AMQShortString queueName = declareQueue(amqd, protocolHandler);
+        AMQShortString queueName = declareQueue(amqd);
 
-        bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
+        bindQueue(amqd, queueName, consumer.getRawSelectorFieldTable());
 
         try
         {
-            consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelector());
+            consumeFromQueue(consumer, queueName, nowait, consumer.getMessageSelector());
         }
         catch (JMSException e) //thrown by getMessageSelector
         {
@@ -2019,14 +1890,9 @@
 
             _suspended = suspend;
 
-            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-            // Be aware of possible changes to parameter order as versions change.
-            AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId,
-                                                                       getProtocolMajorVersion(),
-                                                                       getProtocolMinorVersion(),
-                                                                       !suspend);    // active
+            AMQMethodBody flowBody = getAMQMethodFactory().createChannelFlow(!suspend);
+            sendCommandReceiveResponse(flowBody);
 
-            _connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class);
         }
     }
 
@@ -2118,32 +1984,15 @@
 
     public void requestAccess(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write, boolean read) throws AMQException
     {
-        getProtocolHandler().writeCommandFrameAndWaitForReply(AccessRequestBody.createAMQFrame(getChannelId(),
-                                                                                               getProtocolMajorVersion(),
-                                                                                               getProtocolMinorVersion(),
-                                                                                               active,
-                                                                                               exclusive,
-                                                                                               passive,
-                                                                                               read,
-                                                                                               realm,
-                                                                                               write),
-                                                              new BlockingMethodFrameListener(_channelId)
-                                                              {
 
-                                                                  public boolean processMethod(int channelId, AMQMethodBody frame) throws AMQException
-                                                                  {
-                                                                      if (frame instanceof AccessRequestOkBody)
-                                                                      {
-                                                                          setTicket(((AccessRequestOkBody) frame).getTicket());
-                                                                          return true;
-                                                                      }
-                                                                      else
-                                                                      {
-                                                                          return false;
-                                                                      }
-                                                                  }
-                                                              });
+        AccessRequestBody accessRequest = getAMQMethodFactory().createAccessRequest(active,exclusive,passive,read,realm,write);
+        AccessRequestOkBody okBody = getProtocolOutputHandler().sendCommandReceiveResponse(_channelId,accessRequest, AccessRequestOkBody.class );
+        setTicket(okBody.getTicket());
+    }
 
+    AMQMethodFactory getAMQMethodFactory()
+    {
+        return getProtocolOutputHandler().getAMQMethodFactory();
     }
 
     private class SuspenderRunner implements Runnable
@@ -2187,7 +2036,7 @@
         {
             UnprocessedMessage message = (UnprocessedMessage) messages.next();
 
-            if (consumerTag == null || message.getDeliverBody().consumerTag.equals(consumerTag))
+            if (consumerTag == null || message.getDeliverBody().getConsumerTag().equals(consumerTag))
             {
                 if (_logger.isTraceEnabled())
                 {
@@ -2196,7 +2045,7 @@
 
                 messages.remove();
 
-                rejectMessage(message.getDeliverBody().deliveryTag, requeue);
+                rejectMessage(message.getDeliverBody().getDeliveryTag(), requeue);
 
                 _logger.debug("Rejected the message(" + message.getDeliverBody() + ") for consumer :" + consumerTag);
             }
@@ -2209,13 +2058,27 @@
 
     public void rejectMessage(long deliveryTag, boolean requeue)
     {
-        AMQFrame basicRejectBody = BasicRejectBody.createAMQFrame(_channelId,
-                                                                  getProtocolMajorVersion(),
-                                                                  getProtocolMinorVersion(),
-                                                                  deliveryTag,
-                                                                  requeue);
+        AMQMethodBody rejectBody = getAMQMethodFactory().createRejectBody(deliveryTag, requeue);
+        sendCommand(rejectBody);
 
-        _connection.getProtocolHandler().writeFrame(basicRejectBody);
     }
 
+    <T extends AMQMethodBody> T sendCommandReceiveResponse(AMQMethodBody command, Class<T> responseClass) throws AMQException
+    {
+        return getProtocolOutputHandler().sendCommandReceiveResponse(_channelId, command, responseClass);
+    }
+    AMQMethodBody sendCommandReceiveResponse(AMQMethodBody command) throws AMQException
+    {
+        return getProtocolOutputHandler().sendCommandReceiveResponse(_channelId, command);
+    }
+    AMQMethodBody sendCommandReceiveResponse(AMQMethodBody command, long timeout) throws AMQException
+    {
+        return getProtocolOutputHandler().sendCommandReceiveResponse(_channelId, command, timeout);
+    }
+
+
+    void sendCommand(AMQMethodBody command)
+    {
+        getProtocolOutputHandler().sendCommand(_channelId, command);
+    }
 }