You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/01/16 16:16:55 UTC

svn commit: r496725 [4/11] - in /incubator/qpid/branches/perftesting/qpid: gentools/ gentools/src/org/apache/qpid/gentools/ gentools/templ.cpp/ gentools/templ.java/ java/ java/broker/ java/broker/etc/ java/broker/src/main/java/org/apache/qpid/server/ j...

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Jan 16 07:16:39 2007
@@ -477,7 +477,10 @@
             }
 
             // Commits outstanding messages sent and outstanding acknowledgements.
-            _connection.getProtocolHandler().syncWrite(TxCommitBody.createAMQFrame(_channelId), TxCommitOkBody.class);
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            _connection.getProtocolHandler().syncWrite(TxCommitBody.createAMQFrame(_channelId, (byte)8, (byte)0), TxCommitOkBody.class);
         }
         catch (AMQException e)
         {
@@ -492,8 +495,11 @@
         checkTransacted();
         try
         {
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
             _connection.getProtocolHandler().syncWrite(
-                    TxRollbackBody.createAMQFrame(_channelId), TxRollbackOkBody.class);
+                    TxRollbackBody.createAMQFrame(_channelId, (byte)8, (byte)0), TxRollbackOkBody.class);
         }
         catch (AMQException e)
         {
@@ -516,8 +522,15 @@
                 try
                 {
                     _connection.getProtocolHandler().closeSession(this);
-                    final AMQFrame frame = ChannelCloseBody.createAMQFrame(
-                            getChannelId(), AMQConstant.REPLY_SUCCESS.getCode(), "JMS client closing channel", 0, 0);
+        	        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        	        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        	        // Be aware of possible changes to parameter order as versions change.
+                    final AMQFrame frame = ChannelCloseBody.createAMQFrame(getChannelId(),
+                        (byte)8, (byte)0,	// AMQP version (major, minor)
+                        0,	// classId
+                        0,	// methodId
+                        AMQConstant.REPLY_SUCCESS.getCode(),	// replyCode
+                        "JMS client closing channel");	// replyText
                     _connection.getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class);
                     // When control resumes at this point, a reply will have been received that
                     // indicates the broker has closed the channel successfully
@@ -707,7 +720,12 @@
         {
             consumer.clearUnackedMessages();
         }
-        _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, false));
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
+            (byte)8, (byte)0,	// AMQP version (major, minor)
+            false));	// requeue
     }
 
     boolean isInRecovery()
@@ -968,7 +986,7 @@
                 //    ft.put("headers", rawSelector.getDataAsBytes());
                 if (rawSelector != null)
                 {
-                    ft.putAll(rawSelector);
+                    ft.addAll(rawSelector);
                 }
                 BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, _connection, amqd, selector, noLocal,
                                                                          _messageFactoryRegistry, AMQSession.this,
@@ -1039,7 +1057,20 @@
 
     public void declareExchangeSynch(String name, String type) throws AMQException
     {
-        AMQFrame frame = ExchangeDeclareBody.createAMQFrame(_channelId, 0, name, type, false, false, false, false, false, null);
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        AMQFrame frame = ExchangeDeclareBody.createAMQFrame(_channelId,
+            (byte)8, (byte)0,	// AMQP version (major, minor)
+            null,	// arguments
+            false,	// autoDelete
+            false,	// durable
+            name,	// exchange
+            false,	// internal
+            false,	// nowait
+            false,	// passive
+            0,	// ticket
+            type);	// type
         _connection.getProtocolHandler().syncWrite(frame, ExchangeDeclareOkBody.class);
     }
 
@@ -1050,7 +1081,20 @@
 
     private void declareExchange(String name, String type, AMQProtocolHandler protocolHandler)
     {
-        AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId, 0, name, type, false, false, false, false, true, null);
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId,
+            (byte)8, (byte)0,	// AMQP version (major, minor)
+            null,	// arguments
+            false,	// autoDelete
+            false,	// durable
+            name,	// exchange
+            false,	// internal
+            true,	// nowait
+            false,	// passive
+            0,	// ticket
+            type);	// type
         protocolHandler.writeFrame(exchangeDeclare);
     }
 
@@ -1072,9 +1116,19 @@
             amqd.setQueueName(protocolHandler.generateQueueName());
         }
 
-        AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId, 0, amqd.getQueueName(),
-                                                                false, amqd.isDurable(), amqd.isExclusive(),
-                                                                amqd.isAutoDelete(), true, null);
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId,
+            (byte)8, (byte)0,	// AMQP version (major, minor)
+            null,	// arguments
+            amqd.isAutoDelete(),	// autoDelete
+            amqd.isDurable(),	// durable
+            amqd.isExclusive(),	// exclusive
+            true,	// nowait
+            false,	// passive
+            amqd.getQueueName(),	// queue
+            0);	// ticket
 
         protocolHandler.writeFrame(queueDeclare);
         return amqd.getQueueName();
@@ -1082,9 +1136,17 @@
 
     private void bindQueue(AMQDestination amqd, String queueName, AMQProtocolHandler protocolHandler, FieldTable ft) throws AMQException
     {
-        AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId, 0,
-                                                          queueName, amqd.getExchangeName(),
-                                                          amqd.getRoutingKey(), true, ft);
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId,
+            (byte)8, (byte)0,	// AMQP version (major, minor)
+            ft,	// arguments
+            amqd.getExchangeName(),	// exchange
+            true,	// nowait
+            queueName,	// queue
+            amqd.getRoutingKey(),	// routingKey
+            0);	// ticket
 
         protocolHandler.writeFrame(queueBind);
     }
@@ -1122,10 +1184,19 @@
 
         try
         {
-            AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, 0,
-                                                                  queueName, tag, consumer.isNoLocal(),
-                                                                  consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
-                                                                  consumer.isExclusive(), nowait, arguments);
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId,
+                (byte)8, (byte)0,	// AMQP version (major, minor)
+                arguments,	// arguments
+                tag,	// consumerTag
+                consumer.isExclusive(),	// exclusive
+                consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,	// noAck
+                consumer.isNoLocal(),	// noLocal
+                nowait,	// nowait
+                queueName,	// queue
+                0);	// ticket
             if (nowait)
             {
                 protocolHandler.writeFrame(jmsConsume);
@@ -1302,8 +1373,16 @@
     {
         try
         {
-            AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId, 0, queueName, false,
-                                                                       false, true);
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId,
+                (byte)8, (byte)0,	// AMQP version (major, minor)
+                false,	// ifEmpty
+                false,	// ifUnused
+                true,	// nowait
+                queueName,	// queue
+                0);	// ticket
             _connection.getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class);
         }
         catch (AMQException e)
@@ -1389,8 +1468,14 @@
 
     boolean isQueueBound(String queueName, String routingKey) throws JMSException
     {
-        AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId, ExchangeDefaults.TOPIC_EXCHANGE_NAME,
-                                                               routingKey, queueName);
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId,
+            (byte)8, (byte)0,	// AMQP version (major, minor)
+            ExchangeDefaults.TOPIC_EXCHANGE_NAME,	// exchange
+            queueName,	// queue
+            routingKey);	// routingKey
         AMQMethodEvent response = null;
         try
         {
@@ -1447,7 +1532,13 @@
      */
     public void acknowledgeMessage(long deliveryTag, boolean multiple)
     {
-        final AMQFrame ackFrame = BasicAckBody.createAMQFrame(_channelId, deliveryTag, multiple);
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        final AMQFrame ackFrame = BasicAckBody.createAMQFrame(_channelId,
+            (byte)8, (byte)0,	// AMQP version (major, minor)
+            deliveryTag,	// deliveryTag
+            multiple);	// multiple
         if (_logger.isDebugEnabled())
         {
             _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);
@@ -1606,14 +1697,24 @@
     private void suspendChannel()
     {
         _logger.warn("Suspending channel");
-        AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, false);
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId,
+            (byte)8, (byte)0,	// AMQP version (major, minor)
+            false);	// active
         _connection.getProtocolHandler().writeFrame(channelFlowFrame);
     }
 
     private void unsuspendChannel()
     {
         _logger.warn("Unsuspending channel");
-        AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, true);
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId,
+            (byte)8, (byte)0,	// AMQP version (major, minor)
+            true);	// active
         _connection.getProtocolHandler().writeFrame(channelFlowFrame);
     }
 

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Tue Jan 16 07:16:39 2007
@@ -448,7 +448,13 @@
             {
                 if(sendClose)
                 {
-                    final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, _consumerTag, false);
+                    // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+                    // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+                    // Be aware of possible changes to parameter order as versions change.
+                    final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId,
+                        (byte)8, (byte)0,	// AMQP version (major, minor)
+                        _consumerTag,	// consumerTag
+                        false);	// nowait
 
                     try
                     {

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Tue Jan 16 07:16:39 2007
@@ -134,9 +134,20 @@
     {
         // Declare the exchange
         // Note that the durable and internal arguments are ignored since passive is set to false
-        AMQFrame declare = ExchangeDeclareBody.createAMQFrame(_channelId, 0, destination.getExchangeName(),
-                                                              destination.getExchangeClass(), false,
-                                                              false, false, false, true, null);
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        AMQFrame declare = ExchangeDeclareBody.createAMQFrame(_channelId,
+            (byte)8, (byte)0,	// AMQP version (major, minor)
+            null,	// arguments
+            false,	// autoDelete
+            false,	// durable
+            destination.getExchangeName(),	// exchange
+            false,	// internal
+            true,	// nowait
+            false,	// passive
+            0,	// ticket
+            destination.getExchangeClass());	// type
         _protocolHandler.writeFrame(declare);
     }
 
@@ -512,8 +523,16 @@
         
         AbstractJMSMessage message = convertToNativeMessage(origMessage);
         message.getJmsContentHeaderProperties().getJMSHeaders().setString(CustomJMXProperty.JMSX_QPID_JMSDESTINATIONURL.toString(), destination.toURL());
-        AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId, 0, destination.getExchangeName(),
-                                                                destination.getRoutingKey(), mandatory, immediate);
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId,
+            (byte)8, (byte)0,	// AMQP version (major, minor)
+            destination.getExchangeName(),	// exchange
+            immediate,	// immediate
+            mandatory,	// mandatory
+            destination.getRoutingKey(),	// routingKey
+            0);	// ticket
 
         long currentTime = 0;
         if (!_disableTimestamps)
@@ -555,7 +574,9 @@
         }
 
         // weight argument of zero indicates no child content headers, just bodies
-        AMQFrame contentHeaderFrame = ContentHeaderBody.createAMQFrame(_channelId, BasicConsumeBody.CLASS_ID, 0,
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        AMQFrame contentHeaderFrame = ContentHeaderBody.createAMQFrame(_channelId, BasicConsumeBody.getClazz((byte)8, (byte)0), 0,
                                                                        contentHeaderProperties,
                                                                        size);
         if (_logger.isDebugEnabled())

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java Tue Jan 16 07:16:39 2007
@@ -22,10 +22,9 @@
 
 import org.apache.qpid.common.QpidProperties;
 
-import java.util.Enumeration;
-
 import javax.jms.ConnectionMetaData;
 import javax.jms.JMSException;
+import java.util.Enumeration;
 
 public class QpidConnectionMetaData implements ConnectionMetaData
 {
@@ -90,7 +89,7 @@
 
     public String getClientVersion()
     {
-        return QpidProperties.getBuildVerision();
+        return QpidProperties.getBuildVersion();
     }
 
 

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java Tue Jan 16 07:16:39 2007
@@ -132,10 +132,6 @@
 			throw new javax.jms.IllegalStateException("Publisher is closed");
 		}
 		
-		if(queue == null){
-			throw new UnsupportedOperationException("Queue is null");
-		}
-		
 		AMQSession session = ((BasicMessageProducer) _delegate).getSession();
 		
 		if(session == null || session.isClosed()){

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java Tue Jan 16 07:16:39 2007
@@ -57,7 +57,10 @@
             _logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason);
         }
 
-        AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId());
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
         evt.getProtocolSession().writeFrame(frame);
         if (errorCode != AMQConstant.REPLY_SUCCESS.getCode())
         {

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java Tue Jan 16 07:16:39 2007
@@ -59,7 +59,10 @@
         String reason = method.replyText;
 
         // TODO: check whether channel id of zero is appropriate
-        evt.getProtocolSession().writeFrame(ConnectionCloseOkBody.createAMQFrame((short)0));
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        evt.getProtocolSession().writeFrame(ConnectionCloseOkBody.createAMQFrame((short)0, (byte)8, (byte)0));
 
         if (errorCode != 200)
         {

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java Tue Jan 16 07:16:39 2007
@@ -54,7 +54,12 @@
         {
             // Evaluate server challenge
             byte[] response = client.evaluateChallenge(body.challenge);
-            AMQFrame responseFrame = ConnectionSecureOkBody.createAMQFrame(evt.getChannelId(), response);
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            AMQFrame responseFrame = ConnectionSecureOkBody.createAMQFrame(evt.getChannelId(),
+                (byte)8, (byte)0,	// AMQP version (major, minor)
+                response);	// response
             evt.getProtocolSession().writeFrame(responseFrame);
         }
         catch (SaslException e)

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java Tue Jan 16 07:16:39 2007
@@ -124,10 +124,17 @@
             
             clientProperties.put(ClientProperties.instance.toString(), ps.getClientID());
             clientProperties.put(ClientProperties.product.toString(), QpidProperties.getProductName());
-            clientProperties.put(ClientProperties.version.toString(), QpidProperties.getReleaseVerision());
+            clientProperties.put(ClientProperties.version.toString(), QpidProperties.getReleaseVersion());
             clientProperties.put(ClientProperties.platform.toString(), getFullSystemInfo());
-            ps.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(), clientProperties, mechanism,
-                                                               saslResponse, selectedLocale));
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            ps.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(),
+                (byte)8, (byte)0,	// AMQP version (major, minor)
+                clientProperties,	// clientProperties
+                selectedLocale,	// locale
+                mechanism,	// mechanism
+                saslResponse));	// response
         }
         catch (UnsupportedEncodingException e)
         {

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java Tue Jan 16 07:16:39 2007
@@ -72,11 +72,25 @@
 
     protected AMQFrame createConnectionOpenFrame(int channel, String path, String capabilities, boolean insist)
     {
-        return ConnectionOpenBody.createAMQFrame(channel, path, capabilities, insist);
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        return ConnectionOpenBody.createAMQFrame(channel,
+            (byte)8, (byte)0,	// AMQP version (major, minor)
+            capabilities,	// capabilities
+            insist,	// insist
+            path);	// virtualHost
     }
 
     protected AMQFrame createTuneOkFrame(int channel, ConnectionTuneParameters params)
     {
-        return ConnectionTuneOkBody.createAMQFrame(channel, params.getChannelMax(), params.getFrameMax(), params.getHeartbeat());
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        return ConnectionTuneOkBody.createAMQFrame(channel,
+            (byte)8, (byte)0,	// AMQP version (major, minor)
+            params.getChannelMax(),	// channelMax
+            params.getFrameMax(),	// frameMax
+            params.getHeartbeat());	// heartbeat
     }
 }

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java Tue Jan 16 07:16:39 2007
@@ -63,7 +63,7 @@
         }
     }
 
-    private void allocateInitialBuffer()
+    protected void allocateInitialBuffer()
     {
         _data = ByteBuffer.allocate(DEFAULT_BUFFER_INITIAL_SIZE);
         _data.setAutoExpand(true);

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Tue Jan 16 07:16:39 2007
@@ -452,10 +452,6 @@
         }
     }
 
-    public Map getUnderlyingMessagePropertiesMap()
-    {
-        return getJmsContentHeaderProperties().getHeaders();
-    }
 
     public void setUnderlyingMessagePropertiesMap(FieldTable messageProperties)
     {

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java Tue Jan 16 07:16:39 2007
@@ -22,23 +22,23 @@
 
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.EncodingUtils;
-import org.apache.qpid.framing.JMSPropertyFieldTable;
-import org.apache.qpid.framing.AMQFrameDecodingException;
 import org.apache.qpid.AMQException;
 import org.apache.log4j.Logger;
 
 import javax.jms.JMSException;
-import java.util.Enumeration;
+import javax.jms.MessageFormatException;
+import java.util.*;
+import java.nio.charset.Charset;
+import java.nio.charset.CharacterCodingException;
 
-public class JMSMapMessage extends JMSBytesMessage implements javax.jms.MapMessage
+public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jms.MapMessage
 {
     private static final Logger _logger = Logger.getLogger(JMSMapMessage.class);
 
 
     public static final String MIME_TYPE = "jms/map-message";
 
-    private JMSPropertyFieldTable _properties;
+    private Map<String,Object> _map = new HashMap<String, Object>();
 
     JMSMapMessage() throws JMSException
     {
@@ -48,45 +48,30 @@
     JMSMapMessage(ByteBuffer data) throws JMSException
     {
         super(data); // this instantiates a content header
-        _properties = new JMSPropertyFieldTable();
+        populateMapFromData();
     }
 
+
     JMSMapMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data)
             throws AMQException
     {
         super(messageNbr, contentHeader, data);
-
-        if (data != null)
+        try
         {
-
-            long tableSize = EncodingUtils.readInteger(_data);
-            try
-            {
-                _properties = new JMSPropertyFieldTable(_data, tableSize);
-            }
-            catch (JMSException e)
-            {
-                Exception error = e.getLinkedException();
-                if (error instanceof AMQFrameDecodingException)
-                {
-                    throw(AMQFrameDecodingException) error;
-                }
-                else
-                {
-                    throw new AMQException(e.getMessage(), e);
-                }
-            }
-        }
-        else
+            populateMapFromData();
+        }                                                        
+        catch (JMSException je)
         {
-            _properties = new JMSPropertyFieldTable();
+            throw new AMQException("Error populating MapMessage from ByteBuffer", je);
+            
         }
+
     }
 
 
     public String toBodyString() throws JMSException
     {
-        return _properties.toString();
+        return _map.toString();
     }
 
     public String getMimeType()
@@ -98,161 +83,437 @@
     public ByteBuffer getData()
     {
         //What if _data is null?
-        _properties.writeToBuffer(_data);
+        writeMapToData();
         return super.getData();
     }
 
+
+
     @Override
     public void clearBodyImpl() throws JMSException
     {
         super.clearBodyImpl();
-        _properties.clear();
+        _map.clear();
     }
 
-    public boolean getBoolean(String string) throws JMSException
+    public boolean getBoolean(String propName) throws JMSException
     {
-        return _properties.getBoolean(string);
+        Object value = _map.get(propName);
+
+        if(value instanceof Boolean)
+        {
+            return ((Boolean)value).booleanValue();
+        }
+        else if((value instanceof String) || (value == null))
+        {
+            return Boolean.valueOf((String)value);
+        }
+        else
+        {
+            throw new MessageFormatException("Property " + propName + " of type " +
+                                             value.getClass().getName() + " cannot be converted to boolean.");
+        }
+
     }
 
-    public byte getByte(String string) throws JMSException
+    public byte getByte(String propName) throws JMSException
     {
-        return _properties.getByte(string);
+        Object value = _map.get(propName);
+
+        if(value instanceof Byte)
+        {
+            return ((Byte)value).byteValue();
+        }
+        else if((value instanceof String) || (value==null))
+        {
+            return Byte.valueOf((String)value).byteValue();
+        }
+        else
+        {
+            throw new MessageFormatException("Property " + propName + " of type " +
+                                             value.getClass().getName() + " cannot be converted to byte.");
+        }
     }
 
-    public short getShort(String string) throws JMSException
+    public short getShort(String propName) throws JMSException
     {
-        return _properties.getShort(string);
+        Object value = _map.get(propName);
+
+        if(value instanceof Short)
+        {
+            return ((Short)value).shortValue();
+        }
+        else if(value instanceof Byte)
+        {
+            return ((Byte)value).shortValue();
+        }
+        else if((value instanceof String) || (value==null))
+        {
+            return Short.valueOf((String)value).shortValue();
+        }
+        else
+        {
+            throw new MessageFormatException("Property " + propName + " of type " +
+                                             value.getClass().getName() + " cannot be converted to short.");
+        }
+
     }
 
-    public char getChar(String string) throws JMSException
+
+    public int getInt(String propName) throws JMSException
     {
-    	Character result = _properties.getCharacter(string);
+        Object value = _map.get(propName);
 
-        if (result == null)
+        if(value instanceof Integer)
+        {
+            return ((Integer)value).intValue();
+        }
+        else if(value instanceof Short)
+        {
+            return ((Short)value).intValue();
+        }
+        else if(value instanceof Byte)
         {
-            throw new NullPointerException("getChar couldn't find " + string + " item.");
+            return ((Byte)value).intValue();
+        }
+        else if((value instanceof String) || (value==null))
+        {
+            return Integer.valueOf((String)value).intValue();
         }
         else
         {
-            return result;
+            throw new MessageFormatException("Property " + propName + " of type " +
+                                             value.getClass().getName() + " cannot be converted to int.");
         }
+
     }
 
-    public int getInt(String string) throws JMSException
+    public long getLong(String propName) throws JMSException
     {
-        return _properties.getInteger(string);
+        Object value = _map.get(propName);
+
+        if(value instanceof Long)
+        {
+            return ((Long)value).longValue();
+        }
+        else if(value instanceof Integer)
+        {
+            return ((Integer)value).longValue();
+        }
+        if(value instanceof Short)
+        {
+            return ((Short)value).longValue();
+        }
+        if(value instanceof Byte)
+        {
+            return ((Byte)value).longValue();
+        }
+        else if((value instanceof String) || (value==null))
+        {
+            return Long.valueOf((String)value).longValue();
+        }
+        else
+        {
+            throw new MessageFormatException("Property " + propName + " of type " +
+                                             value.getClass().getName() + " cannot be converted to long.");
+        }
+
     }
 
-    public long getLong(String string) throws JMSException
+    public char getChar(String propName) throws JMSException
     {
-        return _properties.getLong(string);
+        Object value = _map.get(propName);
+
+        if(!_map.containsKey(propName))
+        {
+            throw new MessageFormatException("Property " + propName + " not present");
+        }
+        else if(value instanceof Character)
+        {
+            return ((Character)value).charValue();
+        }
+        else if (value == null)
+        {
+            throw new NullPointerException("Property " + propName + " has null value and therefore cannot " +
+                                           "be converted to char.");
+        }
+        else
+        {
+            throw new MessageFormatException("Property " + propName + " of type " +
+                                             value.getClass().getName() + " cannot be converted to boolan.");
+        }
+
     }
 
-    public float getFloat(String string) throws JMSException
+
+
+    public float getFloat(String propName) throws JMSException
     {
-        return _properties.getFloat(string);
+        Object value = _map.get(propName);
+
+        if(value instanceof Float)
+        {
+            return ((Float)value).floatValue();
+        }
+        else if((value instanceof String) || (value==null))
+        {
+            return Float.valueOf((String)value).floatValue();
+        }
+        else
+        {
+            throw new MessageFormatException("Property " + propName + " of type " +
+                                             value.getClass().getName() + " cannot be converted to float.");
+        }
     }
 
-    public double getDouble(String string) throws JMSException
+    public double getDouble(String propName) throws JMSException
     {
-        return _properties.getDouble(string);
+        Object value = _map.get(propName);
+
+        if(value instanceof Double)
+        {
+            return ((Double)value).doubleValue();
+        }
+        else if(value instanceof Float)
+        {
+            return ((Float)value).doubleValue();
+        }
+        else if((value instanceof String) || (value==null))
+        {
+            return Double.valueOf((String)value).doubleValue();
+        }
+        else
+        {
+            throw new MessageFormatException("Property " + propName + " of type " +
+                                             value.getClass().getName() + " cannot be converted to double.");
+        }
     }
 
-    public String getString(String string) throws JMSException
+    public String getString(String propName) throws JMSException
     {
-        return _properties.getString(string);
+        Object value = _map.get(propName);
+
+        if((value instanceof String) || (value == null))
+        {
+            return (String) value;
+        }
+        else if(value instanceof byte[])
+        {
+            throw new MessageFormatException("Property " + propName + " of type byte[] " +
+                                             "cannot be converted to String.");
+        }
+        else
+        {
+            return value.toString();
+        }
+
     }
 
-    public byte[] getBytes(String string) throws JMSException
+    public byte[] getBytes(String propName) throws JMSException
     {
-        return _properties.getBytes(string);
+        Object value = _map.get(propName);
+
+        if(!_map.containsKey(propName))
+        {
+            throw new MessageFormatException("Property " + propName + " not present");                        
+        }
+        else if((value instanceof byte[]) || (value == null))
+        {
+            return (byte[])value;
+        }
+        else
+        {
+            throw new MessageFormatException("Property " + propName + " of type " +
+                                             value.getClass().getName() + " cannot be converted to byte[].");
+        }
     }
 
-    public Object getObject(String string) throws JMSException
+    public Object getObject(String propName) throws JMSException
     {
-        return _properties.getObject(string);
+        return _map.get(propName);
     }
 
     public Enumeration getMapNames() throws JMSException
     {
-        return _properties.getMapNames();
+        return Collections.enumeration(_map.keySet());
     }
 
 
-    public void setBoolean(String string, boolean b) throws JMSException
+    public void setBoolean(String propName, boolean b) throws JMSException
     {
         checkWritable();
-        _properties.setBoolean(string, b);
+        checkPropertyName(propName);
+        _map.put(propName, b);
     }
 
-    public void setByte(String string, byte b) throws JMSException
+    public void setByte(String propName, byte b) throws JMSException
     {
         checkWritable();
-        _properties.setByte(string, b);
+        checkPropertyName(propName);
+        _map.put(propName, b);
     }
 
-    public void setShort(String string, short i) throws JMSException
+    public void setShort(String propName, short i) throws JMSException
     {
         checkWritable();
-        _properties.setShort(string, i);
+        checkPropertyName(propName);
+        _map.put(propName, i);
     }
 
-    public void setChar(String string, char c) throws JMSException
+    public void setChar(String propName, char c) throws JMSException
     {
         checkWritable();
-        _properties.setChar(string, c);
+        checkPropertyName(propName);
+        _map.put(propName, c);
     }
 
-    public void setInt(String string, int i) throws JMSException
+    public void setInt(String propName, int i) throws JMSException
     {
         checkWritable();
-        _properties.setInteger(string, i);
+        checkPropertyName(propName);
+        _map.put(propName, i);
     }
 
-    public void setLong(String string, long l) throws JMSException
+    public void setLong(String propName, long l) throws JMSException
     {
         checkWritable();
-        _properties.setLong(string, l);
+        checkPropertyName(propName);
+        _map.put(propName, l);
     }
 
-    public void setFloat(String string, float v) throws JMSException
+    public void setFloat(String propName, float v) throws JMSException
     {
         checkWritable();
-        _properties.setFloat(string, v);
+        checkPropertyName(propName);
+        _map.put(propName, v);
     }
 
-    public void setDouble(String string, double v) throws JMSException
+    public void setDouble(String propName, double v) throws JMSException
     {
         checkWritable();
-        _properties.setDouble(string, v);
+        checkPropertyName(propName);
+        _map.put(propName, v);
     }
 
-    public void setString(String string, String string1) throws JMSException
+    public void setString(String propName, String string1) throws JMSException
     {
         checkWritable();
-        _properties.setString(string, string1);
+        checkPropertyName(propName);
+        _map.put(propName, string1);
     }
 
-    public void setBytes(String string, byte[] bytes) throws JMSException
+    public void setBytes(String propName, byte[] bytes) throws JMSException
     {
-        this.setBytes(string, bytes, 0, bytes.length);
+        checkWritable();
+        checkPropertyName(propName);
+        _map.put(propName, bytes);
     }
 
-    public void setBytes(String string, byte[] bytes, int i, int i1) throws JMSException
+    public void setBytes(String propName, byte[] bytes, int offset, int length) throws JMSException
     {
+        if((offset == 0) && (length == bytes.length))
+        {
+            setBytes(propName,bytes);
+        }
+        else
+        {
+            byte[] newBytes = new byte[length];
+            System.arraycopy(bytes,offset,newBytes,0,length);
+            setBytes(propName,newBytes);
+        }
+    }
+
+    public void setObject(String propName, Object value) throws JMSException
+    {                                                                                       
         checkWritable();
-        _properties.setBytes(string, bytes, i, i1);
+        checkPropertyName(propName);
+        if(value instanceof Boolean
+                || value instanceof Byte
+                || value instanceof Short
+                || value instanceof Integer
+                || value instanceof Long
+                || value instanceof Character
+                || value instanceof Float
+                || value instanceof Double
+                || value instanceof String
+                || value instanceof byte[]                
+                || value == null)
+        {
+            _map.put(propName, value);
+        }
+        else
+        {
+            throw new MessageFormatException("Cannot set property " + propName + " to value " + value +
+                                             "of type " + value.getClass().getName() + ".");
+        }
     }
 
-    public void setObject(String string, Object object) throws JMSException
+    private void checkPropertyName(String propName)
     {
-        checkWritable();
-        _properties.setObject(string, object);
+        if(propName == null || propName.equals(""))
+        {
+            throw new IllegalArgumentException("Property name cannot be null, or the empty String.");
+        }
+    }
+
+    public boolean itemExists(String propName) throws JMSException
+    {
+        return _map.containsKey(propName);
+    }
+
+
+    private void populateMapFromData() throws JMSException
+    {
+        if(_data != null)
+        {
+            _data.rewind();
+
+            final int entries = readIntImpl();
+            for(int i = 0; i < entries; i++)
+            {
+                String propName = readStringImpl();
+                Object value = readObject();
+                _map.put(propName,value);
+            }
+        }
+        else
+        {
+            _map.clear();
+        }
     }
 
-    public boolean itemExists(String string) throws JMSException
+    private void writeMapToData()
     {
-        return _properties.itemExists(string);
+        allocateInitialBuffer();
+        final int size = _map.size();
+        writeIntImpl(size);
+        for(Map.Entry<String, Object> entry : _map.entrySet())
+        {
+            try
+            {
+                writeStringImpl(entry.getKey());
+            }
+            catch (CharacterCodingException e)
+            {
+                throw new IllegalArgumentException("Cannot encode property key name " + entry.getKey(),e);
+
+
+            }
+            try
+            {
+                writeObject(entry.getValue());
+            }
+            catch (JMSException e)
+            {
+                Object value = entry.getValue();
+                throw new IllegalArgumentException("Cannot encode property key name " + entry.getKey() +
+                        " value : " + value + " (type: " + value.getClass().getName() + ").",e);
+            }
+        }
+
     }
+
+
+
 
 }

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java Tue Jan 16 07:16:39 2007
@@ -31,31 +31,10 @@
 /**
  * @author Apache Software Foundation
  */
-public class JMSStreamMessage extends AbstractBytesMessage implements StreamMessage
+public class JMSStreamMessage extends AbstractBytesTypedMessage implements StreamMessage
 {
     public static final String MIME_TYPE="jms/stream-message";
 
-    private static final byte BOOLEAN_TYPE = (byte) 1;
-
-    private static final byte BYTE_TYPE = (byte) 2;
-
-    private static final byte BYTEARRAY_TYPE = (byte) 3;
-
-    private static final byte SHORT_TYPE = (byte) 4;
-
-    private static final byte CHAR_TYPE = (byte) 5;
-
-    private static final byte INT_TYPE = (byte) 6;
-
-    private static final byte LONG_TYPE = (byte) 7;
-
-    private static final byte FLOAT_TYPE = (byte) 8;
-
-    private static final byte DOUBLE_TYPE = (byte) 9;
-
-    private static final byte STRING_TYPE = (byte) 10;
-
-    private static final byte NULL_STRING_TYPE = (byte) 11;
 
     /**
      * This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read
@@ -97,128 +76,22 @@
         return MIME_TYPE;
     }
 
-    private byte readWireType() throws MessageFormatException, MessageEOFException,
-            MessageNotReadableException
-    {
-        checkReadable();
-        checkAvailable(1);
-        return _data.get();
-    }
 
-    private void writeTypeDiscriminator(byte type) throws MessageNotWriteableException
-    {
-        checkWritable();
-        _data.put(type);
-        _changedData = true;
-    }
 
     public boolean readBoolean() throws JMSException
     {
-        int position = _data.position();
-        byte wireType = readWireType();
-        boolean result;
-        try
-        {
-            switch (wireType)
-            {
-                case BOOLEAN_TYPE:
-                    checkAvailable(1);
-                    result = readBooleanImpl();
-                    break;
-                case STRING_TYPE:
-                    checkAvailable(1);
-                    result = Boolean.parseBoolean(readStringImpl());
-                    break;
-                default:
-                    _data.position(position);
-                    throw new MessageFormatException("Unable to convert " + wireType + " to a boolean");
-            }
-            return result;
-        }
-        catch (RuntimeException e)
-        {
-            _data.position(position);
-            throw e;
-        }
+        return super.readBoolean();
     }
 
-    private boolean readBooleanImpl()
-    {
-        return _data.get() != 0;
-    }
 
     public byte readByte() throws JMSException
     {
-        int position = _data.position();
-        byte wireType = readWireType();
-        byte result;
-        try
-        {
-            switch (wireType)
-            {
-                case BYTE_TYPE:
-                    checkAvailable(1);
-                    result = readByteImpl();
-                    break;
-                case STRING_TYPE:
-                    checkAvailable(1);
-                    result = Byte.parseByte(readStringImpl());
-                    break;
-                default:
-                    _data.position(position);
-                    throw new MessageFormatException("Unable to convert " + wireType + " to a byte");
-            }
-        }
-        catch (RuntimeException e)
-        {
-            _data.position(position);
-            throw e;
-        }
-        return result;
-    }
-
-    private byte readByteImpl()
-    {
-        return _data.get();
+        return super.readByte();
     }
 
     public short readShort() throws JMSException
     {
-        int position = _data.position();
-        byte wireType = readWireType();
-        short result;
-        try
-        {
-            switch (wireType)
-            {
-                case SHORT_TYPE:
-                    checkAvailable(2);
-                    result = readShortImpl();
-                    break;
-                case STRING_TYPE:
-                    checkAvailable(1);
-                    result = Short.parseShort(readStringImpl());
-                    break;
-                case BYTE_TYPE:
-                    checkAvailable(1);
-                    result = readByteImpl();
-                    break;
-                default:
-                    _data.position(position);
-                    throw new MessageFormatException("Unable to convert " + wireType + " to a short");
-            }
-        }
-        catch (RuntimeException e)
-        {
-            _data.position(position);
-            throw e;
-        }
-        return result;
-    }
-
-    private short readShortImpl()
-    {
-        return _data.getShort();
+        return super.readShort();
     }
 
     /**
@@ -229,564 +102,102 @@
      */
     public char readChar() throws JMSException
     {
-        int position = _data.position();
-        byte wireType = readWireType();
-        try
-        {
-        	if(wireType == NULL_STRING_TYPE){
-        		throw new NullPointerException();
-        	}
-        	
-            if (wireType != CHAR_TYPE)
-            {
-                _data.position(position);
-                throw new MessageFormatException("Unable to convert " + wireType + " to a char");
-            }
-            else
-            {
-                checkAvailable(2);
-                return readCharImpl();
-            }
-        }
-        catch (RuntimeException e)
-        {
-            _data.position(position);
-            throw e;
-        }
-    }
-
-    private char readCharImpl()
-    {
-        return _data.getChar();
+        return super.readChar();
     }
 
     public int readInt() throws JMSException
     {
-        int position = _data.position();
-        byte wireType = readWireType();
-        int result;
-        try
-        {
-            switch (wireType)
-            {
-                case INT_TYPE:
-                    checkAvailable(4);
-                    result = readIntImpl();
-                    break;
-                case SHORT_TYPE:
-                    checkAvailable(2);
-                    result = readShortImpl();
-                    break;
-                case STRING_TYPE:
-                    checkAvailable(1);
-                    result = Integer.parseInt(readStringImpl());
-                    break;
-                case BYTE_TYPE:
-                    checkAvailable(1);
-                    result = readByteImpl();
-                    break;
-                default:
-                    _data.position(position);
-                    throw new MessageFormatException("Unable to convert " + wireType + " to an int");
-            }
-            return result;
-        }
-        catch (RuntimeException e)
-        {
-            _data.position(position);
-            throw e;
-        }
-    }
-
-    private int readIntImpl()
-    {
-        return _data.getInt();
+        return super.readInt();
     }
 
     public long readLong() throws JMSException
     {
-        int position = _data.position();
-        byte wireType = readWireType();
-        long result;
-        try
-        {
-            switch (wireType)
-            {
-                case LONG_TYPE:
-                    checkAvailable(8);
-                    result = readLongImpl();
-                    break;
-                case INT_TYPE:
-                    checkAvailable(4);
-                    result = readIntImpl();
-                    break;
-                case SHORT_TYPE:
-                    checkAvailable(2);
-                    result = readShortImpl();
-                    break;
-                case STRING_TYPE:
-                    checkAvailable(1);
-                    result = Long.parseLong(readStringImpl());
-                    break;
-                case BYTE_TYPE:
-                    checkAvailable(1);
-                    result = readByteImpl();
-                    break;
-                default:
-                    _data.position(position);
-                    throw new MessageFormatException("Unable to convert " + wireType + " to a long");
-            }
-            return result;
-        }
-        catch (RuntimeException e)
-        {
-            _data.position(position);
-            throw e;
-        }
-    }
-
-    private long readLongImpl()
-    {
-        return _data.getLong();
+        return super.readLong();
     }
 
     public float readFloat() throws JMSException
     {
-        int position = _data.position();
-        byte wireType = readWireType();
-        float result;
-        try
-        {
-            switch (wireType)
-            {
-                case FLOAT_TYPE:
-                    checkAvailable(4);
-                    result = readFloatImpl();
-                    break;
-                case STRING_TYPE:
-                    checkAvailable(1);
-                    result = Float.parseFloat(readStringImpl());
-                    break;
-                default:
-                    _data.position(position);
-                    throw new MessageFormatException("Unable to convert " + wireType + " to a float");
-            }
-            return result;
-        }
-        catch (RuntimeException e)
-        {
-            _data.position(position);
-            throw e;
-        }
-    }
-
-    private float readFloatImpl()
-    {
-        return _data.getFloat();
+        return super.readFloat();
     }
 
     public double readDouble() throws JMSException
     {
-        int position = _data.position();
-        byte wireType = readWireType();
-        double result;
-        try
-        {
-            switch (wireType)
-            {
-                case DOUBLE_TYPE:
-                    checkAvailable(8);
-                    result = readDoubleImpl();
-                    break;
-                case FLOAT_TYPE:
-                    checkAvailable(4);
-                    result = readFloatImpl();
-                    break;
-                case STRING_TYPE:
-                    checkAvailable(1);
-                    result = Double.parseDouble(readStringImpl());
-                    break;
-                default:
-                    _data.position(position);
-                    throw new MessageFormatException("Unable to convert " + wireType + " to a double");
-            }
-            return result;
-        }
-        catch (RuntimeException e)
-        {
-            _data.position(position);
-            throw e;
-        }
-    }
-
-    private double readDoubleImpl()
-    {
-        return _data.getDouble();
+        return super.readDouble();
     }
 
     public String readString() throws JMSException
     {
-        int position = _data.position();
-        byte wireType = readWireType();
-        String result;
-        try
-        {
-            switch (wireType)
-            {
-                case STRING_TYPE:
-                    checkAvailable(1);
-                    result = readStringImpl();
-                    break;
-                case NULL_STRING_TYPE:
-                    result = null;
-                    throw new NullPointerException("data is null");
-                case BOOLEAN_TYPE:
-                    checkAvailable(1);
-                    result = String.valueOf(readBooleanImpl());
-                    break;
-                case LONG_TYPE:
-                    checkAvailable(8);
-                    result = String.valueOf(readLongImpl());
-                    break;
-                case INT_TYPE:
-                    checkAvailable(4);
-                    result = String.valueOf(readIntImpl());
-                    break;
-                case SHORT_TYPE:
-                    checkAvailable(2);
-                    result = String.valueOf(readShortImpl());
-                    break;
-                case BYTE_TYPE:
-                    checkAvailable(1);
-                    result = String.valueOf(readByteImpl());
-                    break;
-                case FLOAT_TYPE:
-                    checkAvailable(4);
-                    result = String.valueOf(readFloatImpl());
-                    break;
-                case DOUBLE_TYPE:
-                    checkAvailable(8);
-                    result = String.valueOf(readDoubleImpl());
-                    break;
-                case CHAR_TYPE:
-                    checkAvailable(2);
-                    result = String.valueOf(readCharImpl());
-                    break;
-                default:
-                    _data.position(position);
-                    throw new MessageFormatException("Unable to convert " + wireType + " to a String");
-            }
-            return result;
-        }
-        catch (RuntimeException e)
-        {
-            _data.position(position);
-            throw e;
-        }
-    }
-
-    private String readStringImpl() throws JMSException
-    {
-        try
-        {
-            return _data.getString(Charset.forName("UTF-8").newDecoder());
-        }
-        catch (CharacterCodingException e)
-        {
-            JMSException je = new JMSException("Error decoding byte stream as a UTF8 string: " + e);
-            je.setLinkedException(e);
-            throw je;
-        }
+        return super.readString();
     }
 
     public int readBytes(byte[] bytes) throws JMSException
     {
-        if (bytes == null)
-        {
-            throw new IllegalArgumentException("byte array must not be null");
-        }
-        checkReadable();
-        // first call
-        if (_byteArrayRemaining == -1)
-        {
-            // type discriminator checked separately so you get a MessageFormatException rather than
-            // an EOF even in the case where both would be applicable
-            checkAvailable(1);
-            byte wireType = readWireType();
-            if (wireType != BYTEARRAY_TYPE)
-            {
-                throw new MessageFormatException("Unable to convert " + wireType + " to a byte array");
-            }
-            checkAvailable(4);
-            int size = _data.getInt();
-            // size of -1 indicates null
-            if (size == -1)
-            {
-                return -1;
-            }
-            else
-            {
-                if (size > _data.remaining())
-                {
-                    throw new MessageEOFException("Byte array has stated size " + size + " but message only contains " +
-                                                  _data.remaining() + " bytes");
-                }
-                else
-                {
-                    _byteArrayRemaining = size;
-                }
-            }
-        }
-        else if (_byteArrayRemaining == 0)
-        {
-            _byteArrayRemaining = -1;
-            return -1;
-        }
-
-        int returnedSize = readBytesImpl(bytes);
-        if (returnedSize < bytes.length)
-        {
-            _byteArrayRemaining = -1;
-        }
-        return returnedSize;
-    }
-
-    private int readBytesImpl(byte[] bytes)
-    {
-        int count = (_byteArrayRemaining >= bytes.length ? bytes.length : _byteArrayRemaining);
-        _byteArrayRemaining -= count;
-
-        if (count == 0)
-        {
-            return 0;
-        }
-        else
-        {
-            _data.get(bytes, 0, count);
-            return count;
-        }
+        return super.readBytes(bytes);
     }
 
+
     public Object readObject() throws JMSException
     {
-        int position = _data.position();
-        byte wireType = readWireType();
-        Object result = null;
-        try
-        {
-            switch (wireType)
-            {
-                case BOOLEAN_TYPE:
-                    checkAvailable(1);
-                    result = readBooleanImpl();
-                    break;
-                case BYTE_TYPE:
-                    checkAvailable(1);
-                    result = readByteImpl();
-                    break;
-                case BYTEARRAY_TYPE:
-                    checkAvailable(4);
-                    int size = _data.getInt();
-                    if (size == -1)
-                    {
-                        result = null;
-                    }
-                    else
-                    {
-                        _byteArrayRemaining = size;
-                        result = new byte[size];
-                        readBytesImpl(new byte[size]);
-                    }
-                    break;
-                case SHORT_TYPE:
-                    checkAvailable(2);
-                    result = readShortImpl();
-                    break;
-                case CHAR_TYPE:
-                    checkAvailable(2);
-                    result = readCharImpl();
-                    break;
-                case INT_TYPE:
-                    checkAvailable(4);
-                    result = readIntImpl();
-                    break;
-                case LONG_TYPE:
-                    checkAvailable(8);
-                    result = readLongImpl();
-                    break;
-                case FLOAT_TYPE:
-                    checkAvailable(4);
-                    result = readFloatImpl();
-                    break;
-                case DOUBLE_TYPE:
-                    checkAvailable(8);
-                    result = readDoubleImpl();
-                    break;
-                case NULL_STRING_TYPE:
-                    result = null;
-                    break;
-                case STRING_TYPE:
-                    checkAvailable(1);
-                    result = readStringImpl();
-                    break;
-            }
-            return result;
-        }
-        catch (RuntimeException e)
-        {
-            _data.position(position);
-            throw e;
-        }
+        return super.readObject();
     }
 
     public void writeBoolean(boolean b) throws JMSException
     {
-        writeTypeDiscriminator(BOOLEAN_TYPE);
-        _data.put(b ? (byte) 1 : (byte) 0);
+        super.writeBoolean(b);
     }
 
     public void writeByte(byte b) throws JMSException
     {
-        writeTypeDiscriminator(BYTE_TYPE);
-        _data.put(b);
+        super.writeByte(b);
     }
 
     public void writeShort(short i) throws JMSException
     {
-        writeTypeDiscriminator(SHORT_TYPE);
-        _data.putShort(i);
+        super.writeShort(i);
     }
 
     public void writeChar(char c) throws JMSException
     {
-        writeTypeDiscriminator(CHAR_TYPE);
-        _data.putChar(c);
+        super.writeChar(c);
     }
 
     public void writeInt(int i) throws JMSException
     {
-        writeTypeDiscriminator(INT_TYPE);
-        _data.putInt(i);
+        super.writeInt(i);
     }
 
     public void writeLong(long l) throws JMSException
     {
-        writeTypeDiscriminator(LONG_TYPE);
-        _data.putLong(l);
+        super.writeLong(l);
     }
 
     public void writeFloat(float v) throws JMSException
     {
-        writeTypeDiscriminator(FLOAT_TYPE);
-        _data.putFloat(v);
+        super.writeFloat(v);
     }
 
     public void writeDouble(double v) throws JMSException
     {
-        writeTypeDiscriminator(DOUBLE_TYPE);
-        _data.putDouble(v);
+        super.writeDouble(v);
     }
 
     public void writeString(String string) throws JMSException
     {
-        if (string == null)
-        {
-            writeTypeDiscriminator(NULL_STRING_TYPE);
-        }
-        else
-        {
-            writeTypeDiscriminator(STRING_TYPE);
-            try
-            {
-                _data.putString(string, Charset.forName("UTF-8").newEncoder());
-                // we must write the null terminator ourselves
-                _data.put((byte) 0);
-            }
-            catch (CharacterCodingException e)
-            {
-                JMSException ex = new JMSException("Unable to encode string: " + e);
-                ex.setLinkedException(e);
-                throw ex;
-            }
-        }
+        super.writeString(string);
     }
 
     public void writeBytes(byte[] bytes) throws JMSException
     {
-        writeBytes(bytes, 0, bytes == null ? 0 : bytes.length);
+        super.writeBytes(bytes);
     }
 
     public void writeBytes(byte[] bytes, int offset, int length) throws JMSException
     {
-        writeTypeDiscriminator(BYTEARRAY_TYPE);
-        if (bytes == null)
-        {
-            _data.putInt(-1);
-        }
-        else
-        {
-            _data.putInt(length);
-            _data.put(bytes, offset, length);
-        }
+        super.writeBytes(bytes,offset,length);
     }
 
     public void writeObject(Object object) throws JMSException
     {
-        checkWritable();
-        Class clazz = null;
-        if (object == null)
-        {
-            // string handles the output of null values
-            clazz = String.class;
-        }
-        else
-        {
-            clazz = object.getClass();
-        }
-
-        if (clazz == Byte.class)
-        {
-            writeByte((Byte) object);
-        }
-        else if (clazz == Boolean.class)
-        {
-            writeBoolean((Boolean) object);
-        }
-        else if (clazz == byte[].class)
-        {
-            writeBytes((byte[]) object);
-        }
-        else if (clazz == Short.class)
-        {
-            writeShort((Short) object);
-        }
-        else if (clazz == Character.class)
-        {
-            writeChar((Character) object);
-        }
-        else if (clazz == Integer.class)
-        {
-            writeInt((Integer) object);
-        }
-        else if (clazz == Long.class)
-        {
-            writeLong((Long) object);
-        }
-        else if (clazz == Float.class)
-        {
-            writeFloat((Float) object);
-        }
-        else if (clazz == Double.class)
-        {
-            writeDouble((Double) object);
-        }
-        else if (clazz == String.class)
-        {
-            writeString((String) object);
-        }
-        else
-        {
-            throw new MessageFormatException("Only primitives plus byte arrays and String are valid types");
-        }
+        super.writeObject(object);
     }
 }

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Tue Jan 16 07:16:39 2007
@@ -478,8 +478,15 @@
     {
         _stateManager.changeState(AMQState.CONNECTION_CLOSING);
 
-        final AMQFrame frame = ConnectionCloseBody.createAMQFrame(
-                0, AMQConstant.REPLY_SUCCESS.getCode(), "JMS client is closing the connection.", 0, 0);
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        final AMQFrame frame = ConnectionCloseBody.createAMQFrame(0,
+            (byte)8, (byte)0,	// AMQP version (major, minor)
+            0,	// classId
+            0,	// methodId
+            AMQConstant.REPLY_SUCCESS.getCode(),	// replyCode
+            "JMS client is closing the connection.");	// replyText
         syncWrite(frame, ConnectionCloseOkBody.class);
 
         _protocolSession.closeProtocolSession();

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java Tue Jan 16 07:16:39 2007
@@ -49,10 +49,10 @@
         final VmPipeConnector ioConnector = new VmPipeConnector();
         final IoServiceConfig cfg = ioConnector.getDefaultConfig();
         ReferenceCountingExecutorService executorService = ReferenceCountingExecutorService.getInstance();
-        PoolingFilter asyncRead = new PoolingFilter(executorService, PoolingFilter.READ_EVENTS,
+        PoolingFilter asyncRead = PoolingFilter.createAynschReadPoolingFilter(executorService,
                                                     "AsynchronousReadFilter");
         cfg.getFilterChain().addFirst("AsynchronousReadFilter", asyncRead);
-        PoolingFilter asyncWrite = new PoolingFilter(executorService, PoolingFilter.WRITE_EVENTS,
+        PoolingFilter asyncWrite = PoolingFilter.createAynschWritePoolingFilter(executorService, 
                                                      "AsynchronousWriteFilter");
         cfg.getFilterChain().addLast("AsynchronousWriteFilter", asyncWrite);
         

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java Tue Jan 16 07:16:39 2007
@@ -81,7 +81,7 @@
     {
         _connection = connection;
         _destination = destination;
-        _session = (AMQSession) connection.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+        _session = (AMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
         //set up a slow consumer
         _session.createConsumer(destination).setMessageListener(this);
@@ -1111,6 +1111,9 @@
             Assert.fail("Message should be writeable");
         }
     }
+
+    
+
 
     private void testMapValues(JMSMapMessage m, int count) throws JMSException
     {

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java Tue Jan 16 07:16:39 2007
@@ -11,7 +11,6 @@
 import org.apache.qpid.url.BindingURL;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.PropertyFieldTable;
 
 import javax.jms.*;
 

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java Tue Jan 16 07:16:39 2007
@@ -17,7 +17,6 @@
 import org.apache.qpid.client.transport.TransportConnection;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.PropertyFieldTable;
 import org.apache.qpid.url.AMQBindingURL;
 import org.apache.qpid.url.BindingURL;
 
@@ -53,7 +52,7 @@
 
 
         AMQHeadersExchange queue = new AMQHeadersExchange(new AMQBindingURL(ExchangeDefaults.HEADERS_EXCHANGE_CLASS+"://"+ExchangeDefaults.HEADERS_EXCHANGE_NAME+"/test/queue1?"+ BindingURL.OPTION_ROUTING_KEY+"='F0000=1'"));
-        FieldTable ft = new PropertyFieldTable();
+        FieldTable ft = new FieldTable();
         ft.setString("F1000","1");
         MessageConsumer consumer = consumerSession.createConsumer(queue, AMQSession.DEFAULT_PREFETCH_LOW_MARK, AMQSession.DEFAULT_PREFETCH_HIGH_MARK, false, false, (String)null, ft);
 

Modified: incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java Tue Jan 16 07:16:39 2007
@@ -112,7 +112,9 @@
 
     private void ping(Broker b) throws AMQException
     {
-        ClusterPingBody ping = new ClusterPingBody();
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        ClusterPingBody ping = new ClusterPingBody((byte)8, (byte)0);
         ping.broker = _group.getLocal().getDetails();
         ping.responseRequired = true;
         ping.load = _loadTable.getLocalLoad();
@@ -158,7 +160,9 @@
 
         Broker leader = connectToLeader(member);
         _logger.info(new LogMessage("Connected to {0}. joining", leader));
-        ClusterJoinBody join = new ClusterJoinBody();
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        ClusterJoinBody join = new ClusterJoinBody((byte)8, (byte)0);
         join.broker = _group.getLocal().getDetails();
         send(leader, new SimpleSendable(join));
     }
@@ -177,7 +181,9 @@
 
     public void leave() throws AMQException
     {
-        ClusterLeaveBody leave = new ClusterLeaveBody();
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        ClusterLeaveBody leave = new ClusterLeaveBody((byte)8, (byte)0);
         leave.broker = _group.getLocal().getDetails();
         send(getLeader(), new SimpleSendable(leave));
     }
@@ -198,7 +204,9 @@
         }
         else
         {
-            ClusterSuspectBody suspect = new ClusterSuspectBody();
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            ClusterSuspectBody suspect = new ClusterSuspectBody((byte)8, (byte)0);
             suspect.broker = broker.getDetails();
             send(getLeader(), new SimpleSendable(suspect));
         }
@@ -220,7 +228,9 @@
         else
         {
             //pass request on to leader:
-            ClusterJoinBody request = new ClusterJoinBody();
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            ClusterJoinBody request = new ClusterJoinBody((byte)8, (byte)0);
             request.broker = member.getDetails();
             Broker leader = getLeader();
             send(leader, new SimpleSendable(request));
@@ -265,7 +275,9 @@
 
     private ClusterMembershipBody createAnnouncement(String membership)
     {
-        ClusterMembershipBody announce = new ClusterMembershipBody();
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        ClusterMembershipBody announce = new ClusterMembershipBody((byte)8, (byte)0);
         //TODO: revise this way of converting String to bytes...
         announce.members = membership.getBytes();
         return announce;

Modified: incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java Tue Jan 16 07:16:39 2007
@@ -48,7 +48,13 @@
         if (queue instanceof ClusteredQueue)
         {
             ((ClusteredQueue) queue).addRemoteSubcriber(ClusteredProtocolSession.getSessionPeer(session));
-            session.writeFrame(BasicConsumeOkBody.createAMQFrame(evt.getChannelId(), evt.getMethod().queue));
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            session.writeFrame(BasicConsumeOkBody.createAMQFrame(evt.getChannelId(),
+            	(byte)8, (byte)0,	// AMQP version (major, minor)
+            	evt.getMethod().queue	// consumerTag
+                ));
         }
         else
         {

Modified: incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java Tue Jan 16 07:16:39 2007
@@ -51,7 +51,9 @@
     {
         for(String queue : _counts.keySet())
         {
-            BasicConsumeBody m = new BasicConsumeBody();
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            BasicConsumeBody m = new BasicConsumeBody((byte)8, (byte)0);
             m.queue = queue;
             m.consumerTag = queue;
             replay(m, messages);