You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2006/12/22 18:00:31 UTC

svn commit: r489691 [1/2] - in /incubator/qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/handler/ broker/src/main/java/org/apache/qpid/server/protocol/ broker/src/main/java/org/apache/qpid...

Author: kpvdr
Date: Fri Dec 22 09:00:28 2006
New Revision: 489691

URL: http://svn.apache.org/viewvc?view=rev&rev=489691
Log:
AMQP version using new generator - Part 1. In these changes, all places where version-specific info is required, it has been hard-wired to major=8, minor=0. The next phase of changes will connect the version info to that obtained from ProtocolInitiation for the current session.

Modified:
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
    incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java
    incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
    incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
    incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
    incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
    incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java
    incubator/qpid/trunk/qpid/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java
    incubator/qpid/trunk/qpid/java/common/pom.xml
    incubator/qpid/trunk/qpid/java/common/protocol-version.xml
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQChannelException.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
    incubator/qpid/trunk/qpid/java/common/src/main/xsl/cluster.asl
    incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
    incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
    incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java
    incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java Fri Dec 22 09:00:28 2006
@@ -81,7 +81,11 @@
 
     public CompositeAMQDataBlock getReturnMessage(int channel)
     {
-        BasicReturnBody returnBody = new BasicReturnBody();
+	    // AMQP version change: All generated *Body classes are now version-aware.
+        // Shortcut: hardwire version to 0-8 (major=8, minor=0) for now.
+        // TODO: Connect the version to that returned by the ProtocolInitiation
+        // for this session.
+        BasicReturnBody returnBody = new BasicReturnBody((byte)8, (byte)0);
         returnBody.exchange = _publishBody.exchange;
         returnBody.replyCode = getReplyCode();
         returnBody.replyText = _message;

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java Fri Dec 22 09:00:28 2006
@@ -54,7 +54,12 @@
         channel.unsubscribeConsumer(protocolSession, body.consumerTag);
         if(!body.nowait)
         {
-            final AMQFrame responseFrame = BasicCancelOkBody.createAMQFrame(evt.getChannelId(), body.consumerTag);
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            final AMQFrame responseFrame = BasicCancelOkBody.createAMQFrame(evt.getChannelId(),
+                (byte)8, (byte)0,	// AMQP version (major, minor)
+                body.consumerTag);	// consumerTag
             protocolSession.writeFrame(responseFrame);
         }
     }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java Fri Dec 22 09:00:28 2006
@@ -81,7 +81,12 @@
                                                               body.arguments, body.noLocal);
                 if (!body.nowait)
                 {
-                    session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId, consumerTag));
+                    // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+                    // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+                    // Be aware of possible changes to parameter order as versions change.
+                    session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId,
+                        (byte)8, (byte)0,	// AMQP version (major, minor)
+                        consumerTag));		// consumerTag
                 }
 
                 //now allow queue to start async processing of any backlog of messages
@@ -90,16 +95,28 @@
             catch (AMQInvalidSelectorException ise)
             {
                 _log.info("Closing connection due to invalid selector");
-                session.writeFrame(ChannelCloseBody.createAMQFrame(channelId, AMQConstant.INVALID_SELECTOR.getCode(),
-                                                                   ise.getMessage(), BasicConsumeBody.CLASS_ID,
-                                                                   BasicConsumeBody.METHOD_ID));
+                // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+                // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+                // Be aware of possible changes to parameter order as versions change.
+                session.writeFrame(ChannelCloseBody.createAMQFrame(channelId,
+                    (byte)8, (byte)0,	// AMQP version (major, minor)
+                    BasicConsumeBody.getClazz((byte)8, (byte)0),	// classId
+                    BasicConsumeBody.getMethod((byte)8, (byte)0),	// methodId
+                    AMQConstant.INVALID_SELECTOR.getCode(),	// replyCode
+                    ise.getMessage()));		// replyText
             }
             catch (ConsumerTagNotUniqueException e)
             {
                 String msg = "Non-unique consumer tag, '" + body.consumerTag + "'";
-                session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, AMQConstant.NOT_ALLOWED.getCode(), msg,
-                                                                      BasicConsumeBody.CLASS_ID,
-                                                                      BasicConsumeBody.METHOD_ID));
+                // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+                // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+                // Be aware of possible changes to parameter order as versions change.
+                session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId,
+                    (byte)8, (byte)0,	// AMQP version (major, minor)
+                    BasicConsumeBody.getClazz((byte)8, (byte)0),	// classId
+                    BasicConsumeBody.getMethod((byte)8, (byte)0),	// methodId
+                    AMQConstant.NOT_ALLOWED.getCode(),	// replyCode
+                    msg));	// replyText
             }
         }
     }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java Fri Dec 22 09:00:28 2006
@@ -64,7 +64,15 @@
             protocolSession.closeChannel(evt.getChannelId());
             // TODO: modify code gen to make getClazz and getMethod public methods rather than protected
             // then we can remove the hardcoded 0,0
-            AMQFrame cf = ChannelCloseBody.createAMQFrame(evt.getChannelId(), 500, "Unknown exchange name", 0, 0);
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            AMQFrame cf = ChannelCloseBody.createAMQFrame(evt.getChannelId(),
+                (byte)8, (byte)0,	// AMQP version (major, minor)
+                ChannelCloseBody.getClazz((byte)8, (byte)0),	// classId
+                ChannelCloseBody.getMethod((byte)8, (byte)0),	// methodId
+                500,	// replyCode
+                "Unknown exchange name");	// replyText
             protocolSession.writeFrame(cf);
         }
         else

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java Fri Dec 22 09:00:28 2006
@@ -44,6 +44,9 @@
                                AMQProtocolSession session, AMQMethodEvent<BasicQosBody> evt) throws AMQException
     {
         session.getChannel(evt.getChannelId()).setPrefetchCount(evt.getMethod().prefetchCount);
-        session.writeFrame(new AMQFrame(evt.getChannelId(), new BasicQosOkBody()));
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        session.writeFrame(new AMQFrame(evt.getChannelId(), new BasicQosOkBody((byte)8, (byte)0)));
     }
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java Fri Dec 22 09:00:28 2006
@@ -55,7 +55,10 @@
         _logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId +
                      " and method " + body.methodId);
         protocolSession.closeChannel(evt.getChannelId());
-        AMQFrame response = ChannelCloseOkBody.createAMQFrame(evt.getChannelId());
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        AMQFrame response = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
         protocolSession.writeFrame(response);
     }
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java Fri Dec 22 09:00:28 2006
@@ -58,6 +58,12 @@
         channel.setSuspended(!body.active);
         _logger.debug("Channel.Flow for channel " + evt.getChannelId() + ", active=" + body.active);
 
-        AMQFrame response = ChannelFlowOkBody.createAMQFrame(evt.getChannelId(), body.active);
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        AMQFrame response = ChannelFlowOkBody.createAMQFrame(evt.getChannelId(),
+            (byte)8, (byte)0,	// AMQP version (major, minor)
+            body.active);	// active
         protocolSession.writeFrame(response);
-    }}
+    }
+}

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java Fri Dec 22 09:00:28 2006
@@ -55,7 +55,10 @@
         final AMQChannel channel = new AMQChannel(evt.getChannelId(), registry.getMessageStore(),
                                                   exchangeRegistry);
         protocolSession.addChannel(channel);
-        AMQFrame response = ChannelOpenOkBody.createAMQFrame(evt.getChannelId());
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        AMQFrame response = ChannelOpenOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
         protocolSession.writeFrame(response);
     }
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java Fri Dec 22 09:00:28 2006
@@ -62,7 +62,10 @@
         {
             _logger.error("Error closing protocol session: " + e, e);
         }
-        final AMQFrame response = ConnectionCloseOkBody.createAMQFrame(evt.getChannelId());
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        final AMQFrame response = ConnectionCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
         protocolSession.writeFrame(response);
     }
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java Fri Dec 22 09:00:28 2006
@@ -64,7 +64,12 @@
             contextKey = generateClientID();
         }
         protocolSession.setContextKey(contextKey);
-        AMQFrame response = ConnectionOpenOkBody.createAMQFrame((short)0, contextKey);
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        AMQFrame response = ConnectionOpenOkBody.createAMQFrame((short)0,
+            (byte)8, (byte)0,	// AMQP version (major, minor)
+            contextKey);	// knownHosts
         stateManager.changeState(AMQState.CONNECTION_OPEN);
         protocolSession.writeFrame(response);
     }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java Fri Dec 22 09:00:28 2006
@@ -75,25 +75,43 @@
                 // throw new AMQException(AMQConstant.NOT_ALLOWED.getCode(), AMQConstant.NOT_ALLOWED.getName());
                 _logger.info("Authentication failed");
                 stateManager.changeState(AMQState.CONNECTION_CLOSING);
-                AMQFrame close = ConnectionCloseBody.createAMQFrame(0, AMQConstant.NOT_ALLOWED.getCode(),
-                        AMQConstant.NOT_ALLOWED.getName(),
-                        ConnectionCloseBody.CLASS_ID,
-                        ConnectionCloseBody.METHOD_ID);
+                // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+                // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+                // Be aware of possible changes to parameter order as versions change.
+                AMQFrame close = ConnectionCloseBody.createAMQFrame(0,
+                    (byte)8, (byte)0,	// AMQP version (major, minor)
+                    ConnectionCloseBody.getClazz((byte)8, (byte)0),		// classId
+                    ConnectionCloseBody.getMethod((byte)8, (byte)0),	// methodId
+                    AMQConstant.NOT_ALLOWED.getCode(),	// replyCode
+                    AMQConstant.NOT_ALLOWED.getName());	// replyText
                 protocolSession.writeFrame(close);
                 disposeSaslServer(protocolSession);
                 break;
             case SUCCESS:
                 _logger.info("Connected as: " + ss.getAuthorizationID());
                 stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
-                AMQFrame tune = ConnectionTuneBody.createAMQFrame(0, Integer.MAX_VALUE,
-                        ConnectionStartOkMethodHandler.getConfiguredFrameSize(),
-                        HeartbeatConfig.getInstance().getDelay());
+                // TODO: Check the value of channelMax here: This should be the max
+                // value of a 2-byte unsigned integer (as channel is only 2 bytes on the wire),
+                // not Integer.MAX_VALUE (which is signed 4 bytes).
+                // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+                // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+                // Be aware of possible changes to parameter order as versions change.
+                AMQFrame tune = ConnectionTuneBody.createAMQFrame(0,
+                    (byte)8, (byte)0,	// AMQP version (major, minor)
+                    Integer.MAX_VALUE,	// channelMax
+                    ConnectionStartOkMethodHandler.getConfiguredFrameSize(),	// frameMax
+                    HeartbeatConfig.getInstance().getDelay());	// heartbeat
                 protocolSession.writeFrame(tune);
                 disposeSaslServer(protocolSession);
                 break;
             case CONTINUE:
                 stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
-                AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0, authResult.challenge);
+                // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+                // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+                // Be aware of possible changes to parameter order as versions change.
+                AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0,
+                    (byte)8, (byte)0,	// AMQP version (major, minor)
+                    authResult.challenge);	// challenge
                 protocolSession.writeFrame(challenge);
         }
     }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java Fri Dec 22 09:00:28 2006
@@ -92,13 +92,24 @@
                     _logger.info("Connected as: " + ss.getAuthorizationID());
 
                     stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
-                    AMQFrame tune = ConnectionTuneBody.createAMQFrame(0, Integer.MAX_VALUE, getConfiguredFrameSize(),
-                                                                      HeartbeatConfig.getInstance().getDelay());
+                    // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+                    // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+                    // Be aware of possible changes to parameter order as versions change.
+                    AMQFrame tune = ConnectionTuneBody.createAMQFrame(0,
+                        (byte)8, (byte)0,	// AMQP version (major, minor)
+                        Integer.MAX_VALUE,	// channelMax
+                        getConfiguredFrameSize(),	// frameMax
+                        HeartbeatConfig.getInstance().getDelay());	// heartbeat
                     protocolSession.writeFrame(tune);
                     break;
                 case CONTINUE:
                     stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
-                    AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0, authResult.challenge);
+                    // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+                    // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+                    // Be aware of possible changes to parameter order as versions change.
+                    AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0,
+                        (byte)8, (byte)0,	// AMQP version (major, minor)
+                        authResult.challenge);	// challenge
                     protocolSession.writeFrame(challenge);
             }
         }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java Fri Dec 22 09:00:28 2006
@@ -64,6 +64,11 @@
                                ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
                                AMQMethodEvent<ExchangeBoundBody> evt) throws AMQException
     {
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        byte major = (byte)8;
+        byte minor = (byte)0;
+        
         ExchangeBoundBody body = evt.getMethod();
 
         String exchangeName = body.exchange;
@@ -77,8 +82,11 @@
         AMQFrame response;
         if (exchange == null)
         {
-            response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), EXCHANGE_NOT_FOUND,
-                                                             "Exchange " + exchangeName + " not found");
+            // AMQP version change:  Be aware of possible changes to parameter order as versions change.
+            response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+                major, minor,	// AMQP version (major, minor)
+                EXCHANGE_NOT_FOUND,	// replyCode
+                "Exchange " + exchangeName + " not found");	// replyText
         }
         else if (routingKey == null)
         {
@@ -86,11 +94,19 @@
             {
                 if (exchange.hasBindings())
                 {
-                    response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK, null);
+                    // AMQP version change:  Be aware of possible changes to parameter order as versions change.
+                    response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+                        major, minor,	// AMQP version (major, minor)
+                        OK,	// replyCode
+                        null);	// replyText
                 }
                 else
                 {
-                    response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), NO_BINDINGS, null);
+                    // AMQP version change:  Be aware of possible changes to parameter order as versions change.
+                    response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+                        major, minor,	// AMQP version (major, minor)
+                        NO_BINDINGS,	// replyCode
+                        null);	// replyText
                 }
             }
             else
@@ -98,20 +114,29 @@
                 AMQQueue queue = queueRegistry.getQueue(queueName);
                 if (queue == null)
                 {
-                    response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_FOUND,
-                                                                      "Queue " + queueName + " not found");
+                    // AMQP version change:  Be aware of possible changes to parameter order as versions change.
+                    response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+                        major, minor,	// AMQP version (major, minor)
+                        QUEUE_NOT_FOUND,	// replyCode
+                        "Queue " + queueName + " not found");	// replyText
                 }
                 else
                 {
                     if (exchange.isBound(queue))
                     {
-                        response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK, null);
+                        // AMQP version change:  Be aware of possible changes to parameter order as versions change.
+                        response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+                            major, minor,	// AMQP version (major, minor)
+                            OK,	// replyCode
+                            null);	// replyText
                     }
                     else
                     {
-                        response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_BOUND,
-                                                                      "Queue " + queueName + " not bound to exchange " +
-                                                                      exchangeName);
+                        // AMQP version change:  Be aware of possible changes to parameter order as versions change.
+                        response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+                            major, minor,	// AMQP version (major, minor)
+                            QUEUE_NOT_BOUND,	// replyCode
+                            "Queue " + queueName + " not bound to exchange " + exchangeName);	// replyText
                     }
                 }
             }
@@ -121,24 +146,30 @@
             AMQQueue queue = queueRegistry.getQueue(queueName);
             if (queue == null)
             {
-                response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_FOUND,
-                                                                  "Queue " + queueName + " not found");
+                // AMQP version change:  Be aware of possible changes to parameter order as versions change.
+                response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+                    major, minor,	// AMQP version (major, minor)
+                    QUEUE_NOT_FOUND,	// replyCode
+                    "Queue " + queueName + " not found");	// replyText
             }
             else
             {
                 if (exchange.isBound(body.routingKey, queue))
                 {
-                    response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK,
-                                                                     null);
+                    // AMQP version change:  Be aware of possible changes to parameter order as versions change.
+                    response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+                        major, minor,	// AMQP version (major, minor)
+                        OK,	// replyCode
+                        null);	// replyText
                 }
                 else
                 {
+                    // AMQP version change:  Be aware of possible changes to parameter order as versions change.
                     response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
-                                                                     SPECIFIC_QUEUE_NOT_BOUND_WITH_RK,
-                                                                     "Queue " + queueName +
-                                                                     " not bound with routing key " +
-                                                                     body.routingKey + " to exchange " +
-                                                                     exchangeName);
+                        major, minor,	// AMQP version (major, minor)
+                        SPECIFIC_QUEUE_NOT_BOUND_WITH_RK,	// replyCode
+                        "Queue " + queueName + " not bound with routing key " +
+                        body.routingKey + " to exchange " + exchangeName);	// replyText
                 }
             }
         }
@@ -146,16 +177,20 @@
         {
             if (exchange.isBound(body.routingKey))
             {
-                response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK,
-                                                                 null);
+                // AMQP version change:  Be aware of possible changes to parameter order as versions change.
+                response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+                    major, minor,	// AMQP version (major, minor)
+                    OK,	// replyCode
+                    null);	// replyText
             }
             else
             {
+                // AMQP version change:  Be aware of possible changes to parameter order as versions change.
                 response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
-                                                                 NO_QUEUE_BOUND_WITH_RK,
-                                                                 "No queue bound with routing key " +
-                                                                 body.routingKey + " to exchange " +
-                                                                 exchangeName);
+                    major, minor,	// AMQP version (major, minor)
+                    NO_QUEUE_BOUND_WITH_RK,	// replyCode
+                    "No queue bound with routing key " + body.routingKey +
+                    " to exchange " + exchangeName);	// replyText
             }
         }
         protocolSession.writeFrame(response);

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java Fri Dec 22 09:00:28 2006
@@ -75,7 +75,10 @@
         }
         if(!body.nowait)
         {
-            AMQFrame response = ExchangeDeclareOkBody.createAMQFrame(evt.getChannelId());
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            AMQFrame response = ExchangeDeclareOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
             protocolSession.writeFrame(response);
         }
     }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java Fri Dec 22 09:00:28 2006
@@ -53,7 +53,10 @@
         try
         {
             exchangeRegistry.unregisterExchange(body.exchange, body.ifUnused);
-            AMQFrame response = ExchangeDeleteOkBody.createAMQFrame(evt.getChannelId());
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            AMQFrame response = ExchangeDeleteOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
             protocolSession.writeFrame(response);
         }
         catch (ExchangeInUseException e)

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java Fri Dec 22 09:00:28 2006
@@ -90,7 +90,10 @@
         }
         if (!body.nowait)
         {
-            final AMQFrame response = QueueBindOkBody.createAMQFrame(evt.getChannelId());
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            final AMQFrame response = QueueBindOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
             protocolSession.writeFrame(response);
         }
     }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Fri Dec 22 09:00:28 2006
@@ -102,7 +102,14 @@
         }
         if (!body.nowait)
         {
-            AMQFrame response = QueueDeclareOkBody.createAMQFrame(evt.getChannelId(), body.queue, 0L, 0L);
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            AMQFrame response = QueueDeclareOkBody.createAMQFrame(evt.getChannelId(),
+                (byte)8, (byte)0,	// AMQP version (major, minor)
+                0L, // consumerCount
+                0L, // messageCount
+                body.queue); // queue
             _log.info("Queue " + body.queue + " declared successfully");
             protocolSession.writeFrame(response);
         }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java Fri Dec 22 09:00:28 2006
@@ -81,7 +81,12 @@
         {
             int purged = queue.delete(body.ifUnused, body.ifEmpty);
             _store.removeQueue(queue.getName());
-            session.writeFrame(QueueDeleteOkBody.createAMQFrame(evt.getChannelId(), purged));
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            session.writeFrame(QueueDeleteOkBody.createAMQFrame(evt.getChannelId(),
+                (byte)8, (byte)0,	// AMQP version (major, minor)
+                purged));	// messageCount
         }
     }
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java Fri Dec 22 09:00:28 2006
@@ -52,7 +52,10 @@
         try{
             AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
             channel.commit();
-            protocolSession.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId()));
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            protocolSession.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
             channel.processReturns(protocolSession);            
         }catch(AMQException e){
             throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage());

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java Fri Dec 22 09:00:28 2006
@@ -51,7 +51,10 @@
         try{
             AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
             channel.rollback();
-            protocolSession.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId()));
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            protocolSession.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
             //Now resend all the unacknowledged messages back to the original subscribers.
             //(Must be done after the TxnRollback-ok response).
             channel.resend(protocolSession);

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java Fri Dec 22 09:00:28 2006
@@ -48,6 +48,9 @@
                                AMQMethodEvent<TxSelectBody> evt) throws AMQException
     {
         protocolSession.getChannel(evt.getChannelId()).setTransactional(true);
-        protocolSession.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId()));
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        protocolSession.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
     }
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Fri Dec 22 09:00:28 2006
@@ -165,8 +165,17 @@
                 _minor = pi.protocolMinor;
                 String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms();
                 String locales = "en_US";
-                AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0, pi.protocolMajor, pi.protocolMinor, null,
-                                                                       mechanisms.getBytes(), locales.getBytes());
+                // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+                // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+                // Be aware of possible changes to parameter order as versions change.
+                AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0,
+            		(byte)8, (byte)0,	// AMQP version (major, minor)
+                    locales.getBytes(),	// locales
+                    mechanisms.getBytes(),	// mechanisms
+                    null,	// serverProperties
+                	(short)8,	// versionMajor
+                    (short)0	// versionMinor
+                    );
                 _minaProtocolSession.write(response);
             }
             catch (AMQException e)

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java Fri Dec 22 09:00:28 2006
@@ -168,11 +168,20 @@
         }
         else if(throwable instanceof IOException)
         {
-            _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable, throwable);            
+            _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable, throwable);
         }
         else
         {
-            protocolSession.write(ConnectionCloseBody.createAMQFrame(0, 200, throwable.getMessage(), 0, 0));
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            protocolSession.write(ConnectionCloseBody.createAMQFrame(0,
+            	(byte)8, (byte)0,	// AMQP version (major, minor)
+            	0,	// classId
+                0,	// methodId
+                200,	// replyCode
+                throwable.getMessage()	// replyText
+                ));
             _logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable);
             protocolSession.close();
         }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java Fri Dec 22 09:00:28 2006
@@ -193,8 +193,16 @@
     public void closeConnection() throws JMException
     {
         
-        final AMQFrame response = ConnectionCloseBody.createAMQFrame(0, AMQConstant.REPLY_SUCCESS.getCode(),
-                                                      "Broker Management Console has closing the connection.", 0, 0);
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        final AMQFrame response = ConnectionCloseBody.createAMQFrame(0,
+            (byte)8, (byte)0,	// AMQP version (major, minor)
+            0,	// classId
+            0,	// methodId
+        	AMQConstant.REPLY_SUCCESS.getCode(),	// replyCode
+            "Broker Management Console has closing the connection."	// replyText
+            );
         _session.writeFrame(response);
 
         try

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Fri Dec 22 09:00:28 2006
@@ -157,10 +157,20 @@
 
     public CompositeAMQDataBlock getDataBlock(int channel, String consumerTag, long deliveryTag)
     {
+        
         AMQFrame[] allFrames = new AMQFrame[2 + _contentBodies.size()];
 
-        allFrames[0] = BasicDeliverBody.createAMQFrame(channel, consumerTag, deliveryTag, _redelivered,
-                                                       getExchangeName(), getRoutingKey());
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        allFrames[0] = BasicDeliverBody.createAMQFrame(channel,
+        	(byte)8, (byte)0,	// AMQP version (major, minor)
+            consumerTag,	// consumerTag
+        	deliveryTag,	// deliveryTag
+            getExchangeName(),	// exchange
+            _redelivered,	// redelivered
+            getRoutingKey()	// routingKey
+            );
         allFrames[1] = ContentHeaderBody.createAMQFrame(channel, _contentHeaderBody);
         for (int i = 2; i < allFrames.length; i++)
         {

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Fri Dec 22 09:00:28 2006
@@ -379,7 +379,13 @@
         if (!_closed)
         {
             _logger.info("Closing autoclose subscription:" + this);
-            protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(), consumerTag));
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(),
+        		(byte)8, (byte)0,	// AMQP version (major, minor)
+            	consumerTag	// consumerTag
+                ));
             _closed = true;
         }
     }
@@ -392,9 +398,17 @@
 
     private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange)
     {
-        AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(), consumerTag,
-                                                                deliveryTag, false, exchange,
-                                                                routingKey);
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(),
+        	(byte)8, (byte)0,	// AMQP version (major, minor)
+            consumerTag,	// consumerTag
+        	deliveryTag,	// deliveryTag
+            exchange,	// exchange
+            false,	// redelivered
+            routingKey	// routingKey
+            );
         ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
         deliverFrame.writePayload(buf);
         buf.flip();

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Fri Dec 22 09:00:28 2006
@@ -465,12 +465,25 @@
     private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
             throws AMQException
     {
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
         _protocolHandler.syncWrite(
-                ChannelOpenBody.createAMQFrame(channelId, null), ChannelOpenOkBody.class);
+            ChannelOpenBody.createAMQFrame(channelId,
+                (byte)8, (byte)0,	// AMQP version (major, minor)
+                null),	// outOfBand
+                ChannelOpenOkBody.class);
 
         //todo send low water mark when protocol allows.
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
         _protocolHandler.syncWrite(
-                BasicQosBody.createAMQFrame(channelId, 0, prefetchHigh, false),
+            BasicQosBody.createAMQFrame(channelId,
+                (byte)8, (byte)0,	// AMQP version (major, minor)
+                false,	// global
+                prefetchHigh,	// prefetchCount
+                0),	// prefetchSize
                 BasicQosOkBody.class);
 
         if (transacted)
@@ -479,7 +492,10 @@
             {
                 _logger.debug("Issuing TxSelect for " + channelId);
             }
-            _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId), TxSelectOkBody.class);
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, (byte)8, (byte)0), TxSelectOkBody.class);
         }
     }
 

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Fri Dec 22 09:00:28 2006
@@ -477,7 +477,10 @@
             }
 
             // Commits outstanding messages sent and outstanding acknowledgements.
-            _connection.getProtocolHandler().syncWrite(TxCommitBody.createAMQFrame(_channelId), TxCommitOkBody.class);
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            _connection.getProtocolHandler().syncWrite(TxCommitBody.createAMQFrame(_channelId, (byte)8, (byte)0), TxCommitOkBody.class);
         }
         catch (AMQException e)
         {
@@ -492,8 +495,11 @@
         checkTransacted();
         try
         {
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
             _connection.getProtocolHandler().syncWrite(
-                    TxRollbackBody.createAMQFrame(_channelId), TxRollbackOkBody.class);
+                    TxRollbackBody.createAMQFrame(_channelId, (byte)8, (byte)0), TxRollbackOkBody.class);
         }
         catch (AMQException e)
         {
@@ -516,8 +522,15 @@
                 try
                 {
                     _connection.getProtocolHandler().closeSession(this);
-                    final AMQFrame frame = ChannelCloseBody.createAMQFrame(
-                            getChannelId(), AMQConstant.REPLY_SUCCESS.getCode(), "JMS client closing channel", 0, 0);
+        	        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        	        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        	        // Be aware of possible changes to parameter order as versions change.
+                    final AMQFrame frame = ChannelCloseBody.createAMQFrame(getChannelId(),
+                        (byte)8, (byte)0,	// AMQP version (major, minor)
+                        0,	// classId
+                        0,	// methodId
+                        AMQConstant.REPLY_SUCCESS.getCode(),	// replyCode
+                        "JMS client closing channel");	// replyText
                     _connection.getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class);
                     // When control resumes at this point, a reply will have been received that
                     // indicates the broker has closed the channel successfully
@@ -707,7 +720,12 @@
         {
             consumer.clearUnackedMessages();
         }
-        _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, false));
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
+            (byte)8, (byte)0,	// AMQP version (major, minor)
+            false));	// requeue
     }
 
     boolean isInRecovery()
@@ -1039,7 +1057,20 @@
 
     public void declareExchangeSynch(String name, String type) throws AMQException
     {
-        AMQFrame frame = ExchangeDeclareBody.createAMQFrame(_channelId, 0, name, type, false, false, false, false, false, null);
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        AMQFrame frame = ExchangeDeclareBody.createAMQFrame(_channelId,
+            (byte)8, (byte)0,	// AMQP version (major, minor)
+            null,	// arguments
+            false,	// autoDelete
+            false,	// durable
+            name,	// exchange
+            false,	// internal
+            false,	// nowait
+            false,	// passive
+            0,	// ticket
+            type);	// type
         _connection.getProtocolHandler().syncWrite(frame, ExchangeDeclareOkBody.class);
     }
 
@@ -1050,7 +1081,20 @@
 
     private void declareExchange(String name, String type, AMQProtocolHandler protocolHandler)
     {
-        AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId, 0, name, type, false, false, false, false, true, null);
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId,
+            (byte)8, (byte)0,	// AMQP version (major, minor)
+            null,	// arguments
+            false,	// autoDelete
+            false,	// durable
+            name,	// exchange
+            false,	// internal
+            true,	// nowait
+            false,	// passive
+            0,	// ticket
+            type);	// type
         protocolHandler.writeFrame(exchangeDeclare);
     }
 
@@ -1072,9 +1116,19 @@
             amqd.setQueueName(protocolHandler.generateQueueName());
         }
 
-        AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId, 0, amqd.getQueueName(),
-                                                                false, amqd.isDurable(), amqd.isExclusive(),
-                                                                amqd.isAutoDelete(), true, null);
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId,
+            (byte)8, (byte)0,	// AMQP version (major, minor)
+            null,	// arguments
+            amqd.isAutoDelete(),	// autoDelete
+            amqd.isDurable(),	// durable
+            amqd.isExclusive(),	// exclusive
+            true,	// nowait
+            false,	// passive
+            amqd.getQueueName(),	// queue
+            0);	// ticket
 
         protocolHandler.writeFrame(queueDeclare);
         return amqd.getQueueName();
@@ -1082,9 +1136,17 @@
 
     private void bindQueue(AMQDestination amqd, String queueName, AMQProtocolHandler protocolHandler, FieldTable ft) throws AMQException
     {
-        AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId, 0,
-                                                          queueName, amqd.getExchangeName(),
-                                                          amqd.getRoutingKey(), true, ft);
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId,
+            (byte)8, (byte)0,	// AMQP version (major, minor)
+            ft,	// arguments
+            amqd.getExchangeName(),	// exchange
+            true,	// nowait
+            queueName,	// queue
+            amqd.getRoutingKey(),	// routingKey
+            0);	// ticket
 
         protocolHandler.writeFrame(queueBind);
     }
@@ -1122,10 +1184,19 @@
 
         try
         {
-            AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, 0,
-                                                                  queueName, tag, consumer.isNoLocal(),
-                                                                  consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
-                                                                  consumer.isExclusive(), nowait, arguments);
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId,
+                (byte)8, (byte)0,	// AMQP version (major, minor)
+                arguments,	// arguments
+                tag,	// consumerTag
+                consumer.isExclusive(),	// exclusive
+                consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,	// noAck
+                consumer.isNoLocal(),	// noLocal
+                nowait,	// nowait
+                queueName,	// queue
+                0);	// ticket
             if (nowait)
             {
                 protocolHandler.writeFrame(jmsConsume);
@@ -1302,8 +1373,16 @@
     {
         try
         {
-            AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId, 0, queueName, false,
-                                                                       false, true);
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId,
+                (byte)8, (byte)0,	// AMQP version (major, minor)
+                false,	// ifEmpty
+                false,	// ifUnused
+                true,	// nowait
+                queueName,	// queue
+                0);	// ticket
             _connection.getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class);
         }
         catch (AMQException e)
@@ -1389,8 +1468,14 @@
 
     boolean isQueueBound(String queueName, String routingKey) throws JMSException
     {
-        AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId, ExchangeDefaults.TOPIC_EXCHANGE_NAME,
-                                                               routingKey, queueName);
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId,
+            (byte)8, (byte)0,	// AMQP version (major, minor)
+            ExchangeDefaults.TOPIC_EXCHANGE_NAME,	// exchange
+            queueName,	// queue
+            routingKey);	// routingKey
         AMQMethodEvent response = null;
         try
         {
@@ -1447,7 +1532,13 @@
      */
     public void acknowledgeMessage(long deliveryTag, boolean multiple)
     {
-        final AMQFrame ackFrame = BasicAckBody.createAMQFrame(_channelId, deliveryTag, multiple);
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        final AMQFrame ackFrame = BasicAckBody.createAMQFrame(_channelId,
+            (byte)8, (byte)0,	// AMQP version (major, minor)
+            deliveryTag,	// deliveryTag
+            multiple);	// multiple
         if (_logger.isDebugEnabled())
         {
             _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);
@@ -1606,14 +1697,24 @@
     private void suspendChannel()
     {
         _logger.warn("Suspending channel");
-        AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, false);
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId,
+            (byte)8, (byte)0,	// AMQP version (major, minor)
+            false);	// active
         _connection.getProtocolHandler().writeFrame(channelFlowFrame);
     }
 
     private void unsuspendChannel()
     {
         _logger.warn("Unsuspending channel");
-        AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, true);
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId,
+            (byte)8, (byte)0,	// AMQP version (major, minor)
+            true);	// active
         _connection.getProtocolHandler().writeFrame(channelFlowFrame);
     }
 

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Fri Dec 22 09:00:28 2006
@@ -448,7 +448,13 @@
             {
                 if(sendClose)
                 {
-                    final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, _consumerTag, false);
+                    // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+                    // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+                    // Be aware of possible changes to parameter order as versions change.
+                    final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId,
+                        (byte)8, (byte)0,	// AMQP version (major, minor)
+                        _consumerTag,	// consumerTag
+                        false);	// nowait
 
                     try
                     {

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Fri Dec 22 09:00:28 2006
@@ -134,9 +134,20 @@
     {
         // Declare the exchange
         // Note that the durable and internal arguments are ignored since passive is set to false
-        AMQFrame declare = ExchangeDeclareBody.createAMQFrame(_channelId, 0, destination.getExchangeName(),
-                                                              destination.getExchangeClass(), false,
-                                                              false, false, false, true, null);
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        AMQFrame declare = ExchangeDeclareBody.createAMQFrame(_channelId,
+            (byte)8, (byte)0,	// AMQP version (major, minor)
+            null,	// arguments
+            false,	// autoDelete
+            false,	// durable
+            destination.getExchangeName(),	// exchange
+            false,	// internal
+            true,	// nowait
+            false,	// passive
+            0,	// ticket
+            destination.getExchangeClass());	// type
         _protocolHandler.writeFrame(declare);
     }
 
@@ -512,8 +523,16 @@
         
         AbstractJMSMessage message = convertToNativeMessage(origMessage);
         message.getJmsContentHeaderProperties().getJMSHeaders().setString(CustomJMXProperty.JMSX_QPID_JMSDESTINATIONURL.toString(), destination.toURL());
-        AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId, 0, destination.getExchangeName(),
-                                                                destination.getRoutingKey(), mandatory, immediate);
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId,
+            (byte)8, (byte)0,	// AMQP version (major, minor)
+            destination.getExchangeName(),	// exchange
+            immediate,	// immediate
+            mandatory,	// mandatory
+            destination.getRoutingKey(),	// routingKey
+            0);	// ticket
 
         long currentTime = 0;
         if (!_disableTimestamps)
@@ -555,7 +574,9 @@
         }
 
         // weight argument of zero indicates no child content headers, just bodies
-        AMQFrame contentHeaderFrame = ContentHeaderBody.createAMQFrame(_channelId, BasicConsumeBody.CLASS_ID, 0,
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        AMQFrame contentHeaderFrame = ContentHeaderBody.createAMQFrame(_channelId, BasicConsumeBody.getClazz((byte)8, (byte)0), 0,
                                                                        contentHeaderProperties,
                                                                        size);
         if (_logger.isDebugEnabled())

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java Fri Dec 22 09:00:28 2006
@@ -57,7 +57,10 @@
             _logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason);
         }
 
-        AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId());
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
         evt.getProtocolSession().writeFrame(frame);
         if (errorCode != AMQConstant.REPLY_SUCCESS.getCode())
         {

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java Fri Dec 22 09:00:28 2006
@@ -59,7 +59,10 @@
         String reason = method.replyText;
 
         // TODO: check whether channel id of zero is appropriate
-        evt.getProtocolSession().writeFrame(ConnectionCloseOkBody.createAMQFrame((short)0));
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        evt.getProtocolSession().writeFrame(ConnectionCloseOkBody.createAMQFrame((short)0, (byte)8, (byte)0));
 
         if (errorCode != 200)
         {

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java Fri Dec 22 09:00:28 2006
@@ -54,7 +54,12 @@
         {
             // Evaluate server challenge
             byte[] response = client.evaluateChallenge(body.challenge);
-            AMQFrame responseFrame = ConnectionSecureOkBody.createAMQFrame(evt.getChannelId(), response);
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            AMQFrame responseFrame = ConnectionSecureOkBody.createAMQFrame(evt.getChannelId(),
+                (byte)8, (byte)0,	// AMQP version (major, minor)
+                response);	// response
             evt.getProtocolSession().writeFrame(responseFrame);
         }
         catch (SaslException e)

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java Fri Dec 22 09:00:28 2006
@@ -126,8 +126,15 @@
             clientProperties.put(ClientProperties.product.toString(), QpidProperties.getProductName());
             clientProperties.put(ClientProperties.version.toString(), QpidProperties.getReleaseVersion());
             clientProperties.put(ClientProperties.platform.toString(), getFullSystemInfo());
-            ps.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(), clientProperties, mechanism,
-                                                               saslResponse, selectedLocale));
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            ps.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(),
+                (byte)8, (byte)0,	// AMQP version (major, minor)
+                clientProperties,	// clientProperties
+                selectedLocale,	// locale
+                mechanism,	// mechanism
+                saslResponse));	// response
         }
         catch (UnsupportedEncodingException e)
         {