You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/01/29 12:04:48 UTC

svn commit: r501005 - in /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client: ./ handler/ message/ protocol/

Author: rgreig
Date: Mon Jan 29 03:04:43 2007
New Revision: 501005

URL: http://svn.apache.org/viewvc?view=rev&rev=501005
Log:
QPID-324 : Patch supplied by Rob Godfrey - Only send byte indicating topic / queue / other in properties field table, not whole destination


Added:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java
Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java Mon Jan 29 03:04:43 2007
@@ -48,6 +48,12 @@
         this(name, false);
     }
 
+    public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName)
+    {
+        super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, routingKey, false,
+              false, queueName, false);    }
+
+
     /**
      * Create a reference to a non temporary queue. Note this does not actually imply the queue exists.
      * @param name the name of the queue

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java Mon Jan 29 03:04:43 2007
@@ -44,6 +44,12 @@
         this(new AMQShortString(name));
     }
 
+    public AMQTopic(AMQShortString exchange, AMQShortString routingKey, AMQShortString queueName)
+    {
+        super(exchange, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, routingKey, true, true, queueName, false);
+    }
+
+
     public AMQTopic(AMQShortString name)
     {
         this(name, true, null, false);

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java?view=auto&rev=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java Mon Jan 29 03:04:43 2007
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.framing.AMQShortString;
+
+public class AMQUndefinedDestination extends AMQDestination
+{
+
+    private static final AMQShortString UNKNOWN_EXCHANGE_CLASS = new AMQShortString("unknown");
+
+
+    public AMQUndefinedDestination(AMQShortString exchange, AMQShortString routingKey, AMQShortString queueName)
+    {
+        super(exchange, UNKNOWN_EXCHANGE_CLASS, routingKey, queueName);
+    }
+
+    public AMQShortString getRoutingKey()
+    {
+        return getDestinationName();  
+    }
+
+    public boolean isNameRequired()
+    {
+        return getAMQQueueName() == null;
+    }
+}

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Mon Jan 29 03:04:43 2007
@@ -441,8 +441,24 @@
 
 
         AbstractJMSMessage message = convertToNativeMessage(origMessage);
-        message.getJmsHeaders().setBytes(CustomJMSXProperty.JMSX_QPID_JMSDESTINATIONURL.
-                                                         getShortStringName(), destination.toByteEncoding());
+
+        byte type;
+        if(destination instanceof Topic)
+        {
+            type = AMQDestination.TOPIC_TYPE;
+        }
+        else if(destination instanceof Queue)
+        {
+            type = AMQDestination.QUEUE_TYPE;
+        }
+        else
+        {
+            type = AMQDestination.UNKNOWN_TYPE;
+        }
+
+        message.getJmsHeaders().setByte(CustomJMSXProperty.JMSZ_QPID_DESTTYPE.getShortStringName(),
+                                               type);
+
         // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
         // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java Mon Jan 29 03:04:43 2007
@@ -26,7 +26,7 @@
 
 public enum CustomJMSXProperty
 {
-    JMSX_QPID_JMSDESTINATIONURL,
+    JMSZ_QPID_DESTTYPE,    
     JMSXGroupID,
     JMSXGroupSeq;
 

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java Mon Jan 29 03:04:43 2007
@@ -42,9 +42,7 @@
 
     public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
     {
-        final UnprocessedMessage msg = new UnprocessedMessage();
-        msg.deliverBody = (BasicDeliverBody) evt.getMethod();
-        msg.channelId = evt.getChannelId();
+        final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(), (BasicDeliverBody) evt.getMethod());
         _logger.debug("New JmsDeliver method received");
         protocolSession.unprocessedMessageReceived(msg);
     }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java Mon Jan 29 03:04:43 2007
@@ -43,10 +43,7 @@
     public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
     {
         _logger.debug("New JmsBounce method received");
-        final UnprocessedMessage msg = new UnprocessedMessage();
-        msg.deliverBody = null;
-        msg.bounceBody = (BasicReturnBody) evt.getMethod();
-        msg.channelId = evt.getChannelId();
+        final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(),(BasicReturnBody)evt.getMethod());
 
         protocolSession.unprocessedMessageReceived(msg);
     }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java Mon Jan 29 03:04:43 2007
@@ -24,6 +24,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
 
 import javax.jms.JMSException;
 import javax.jms.MessageEOFException;
@@ -69,11 +70,11 @@
         _data.setAutoExpand(true);
     }
 
-    AbstractBytesMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data)
-            throws AMQException
+    AbstractBytesMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
+                         AMQShortString routingKey, ByteBuffer data) throws AMQException
     {
         // TODO: this casting is ugly. Need to review whole ContentHeaderBody idea
-        super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, data);
+        super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, exchange, routingKey, data);
         getContentHeaderProperties().setContentType(getMimeTypeAsShortString());
     }
 

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java Mon Jan 29 03:04:43 2007
@@ -2,6 +2,7 @@
 
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.AMQException;
 
 import javax.jms.*;
@@ -59,10 +60,10 @@
     }
 
 
-    AbstractBytesTypedMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data)
-            throws AMQException
+    AbstractBytesTypedMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
+                              AMQShortString routingKey, ByteBuffer data) throws AMQException
     {
-        super(messageNbr, contentHeader, data);
+        super(messageNbr, contentHeader, exchange, routingKey, data);
     }
 
 

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Mon Jan 29 03:04:43 2007
@@ -26,8 +26,8 @@
 import org.apache.qpid.url.BindingURL;
 import org.apache.qpid.url.AMQBindingURL;
 import org.apache.qpid.url.URLSyntaxException;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.BasicMessageConsumer;
+import org.apache.qpid.client.AMQUndefinedDestination;
+import org.apache.qpid.client.*;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.AMQShortString;
@@ -66,9 +66,33 @@
         _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties)_contentHeaderProperties).getHeaders());
     }
 
-    protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data) throws AMQException
+    protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
+                                 AMQShortString routingKey, ByteBuffer data) throws AMQException
     {
         this(contentHeader, deliveryTag);
+
+
+        byte type = contentHeader.getHeaders().getByte(CustomJMSXProperty.JMSZ_QPID_DESTTYPE.getShortStringName());
+
+        AMQDestination dest;
+
+        switch(type)
+        {
+            case AMQDestination.QUEUE_TYPE:
+                dest = new AMQQueue(exchange, routingKey, routingKey);
+                break;
+            case AMQDestination.TOPIC_TYPE:
+                dest = new AMQTopic(exchange, routingKey, null);
+                break;
+            default:
+                dest = new AMQUndefinedDestination(exchange, routingKey, null);
+                break;
+        }
+        //Destination dest = AMQDestination.createDestination(url);
+        setJMSDestination(dest);
+
+
+
         _data = data;
         if (_data != null)
         {
@@ -181,7 +205,7 @@
         return _destination;
     }
 
-    public void setJMSDestination(Destination destination) throws JMSException
+    public void setJMSDestination(Destination destination)
     {
         _destination = destination;
     }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java Mon Jan 29 03:04:43 2007
@@ -23,6 +23,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.log4j.Logger;
 import org.apache.mina.common.ByteBuffer;
 
@@ -36,10 +37,12 @@
 
 
     protected abstract AbstractJMSMessage createMessage(long messageNbr, ByteBuffer data,
-                                                                ContentHeaderBody contentHeader) throws AMQException;
+                                                        AMQShortString exchange, AMQShortString routingKey,
+                                                        ContentHeaderBody contentHeader) throws AMQException;
 
     protected AbstractJMSMessage createMessageWithBody(long messageNbr,
                                                        ContentHeaderBody contentHeader,
+                                                       AMQShortString exchange, AMQShortString routingKey,
                                                        List bodies) throws AMQException
     {
         ByteBuffer data;
@@ -54,7 +57,7 @@
             }
             data = ((ContentBody)bodies.get(0)).payload;
         }
-        else
+        else if (bodies != null)
         {
             if(debug)
             {
@@ -70,19 +73,24 @@
             }
             data.flip();
         }
+        else // bodies == null
+        {
+            data = ByteBuffer.allocate(0);
+        }
         if(debug)
         {
             _logger.debug("Creating message from buffer with position=" + data.position() + " and remaining=" + data.remaining());
         }
 
-        return createMessage(messageNbr, data, contentHeader);
+        return createMessage(messageNbr, data, exchange, routingKey, contentHeader);
     }
 
     public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered,
                                             ContentHeaderBody contentHeader,
+                                            AMQShortString exchange, AMQShortString routingKey,
                                             List bodies) throws JMSException, AMQException
     {
-        final AbstractJMSMessage msg = createMessageWithBody(messageNbr, contentHeader, bodies);
+        final AbstractJMSMessage msg = createMessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies);
         msg.setJMSRedelivered(redelivered);
         return msg;
     }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java Mon Jan 29 03:04:43 2007
@@ -56,10 +56,10 @@
         super(data); // this instanties a content header
     }
 
-    JMSBytesMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data)
-            throws AMQException
+    JMSBytesMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
+                    AMQShortString routingKey, ByteBuffer data) throws AMQException
     {
-        super(messageNbr, contentHeader, data);
+        super(messageNbr, contentHeader, exchange, routingKey, data);
     }
 
     public void reset()

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java Mon Jan 29 03:04:43 2007
@@ -23,14 +23,17 @@
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
 
 import javax.jms.JMSException;
 
 public class JMSBytesMessageFactory extends AbstractJMSMessageFactory
 {
-    protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader) throws AMQException
+    protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
+                                               AMQShortString exchange, AMQShortString routingKey,
+                                               ContentHeaderBody contentHeader) throws AMQException
     {
-        return new JMSBytesMessage(deliveryTag, contentHeader, data);
+        return new JMSBytesMessage(deliveryTag, contentHeader, exchange, routingKey, data);
     }
 
     public AbstractJMSMessage createMessage() throws JMSException

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java Mon Jan 29 03:04:43 2007
@@ -32,7 +32,7 @@
 
 public final class JMSHeaderAdapter
 {
-    FieldTable _headers;
+    private final FieldTable _headers;
 
     public JMSHeaderAdapter(FieldTable headers)
     {
@@ -319,6 +319,13 @@
         getHeaders().setByte(string, b);
     }
 
+    public void setByte(AMQShortString string, byte b) throws JMSException
+    {
+        checkPropertyName(string);
+        getHeaders().setByte(string, b);
+    }
+
+
     public void setShort(String string, short i) throws JMSException
     {
         checkPropertyName(string);
@@ -326,6 +333,12 @@
     }
 
     public void setInteger(String string, int i) throws JMSException
+    {
+        checkPropertyName(string);
+        getHeaders().setInteger(string, i);
+    }
+
+    public void setInteger(AMQShortString string, int i) throws JMSException
     {
         checkPropertyName(string);
         getHeaders().setInteger(string, i);

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java Mon Jan 29 03:04:43 2007
@@ -54,10 +54,10 @@
     }
 
 
-    JMSMapMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data)
-            throws AMQException
+    JMSMapMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
+                  AMQShortString routingKey, ByteBuffer data) throws AMQException
     {
-        super(messageNbr, contentHeader, data);
+        super(messageNbr, contentHeader, exchange, routingKey, data);
         try
         {
             populateMapFromData();

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java Mon Jan 29 03:04:43 2007
@@ -22,6 +22,7 @@
 
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.AMQException;
 
 import javax.jms.JMSException;
@@ -33,8 +34,10 @@
         return new JMSMapMessage();
     }
 
-    protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader) throws AMQException
+    protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
+                                               AMQShortString exchange, AMQShortString routingKey, 
+                                               ContentHeaderBody contentHeader) throws AMQException
     {
-        return new JMSMapMessage(deliveryTag, contentHeader, data);
+        return new JMSMapMessage(deliveryTag, contentHeader, exchange, routingKey, data);
     }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java Mon Jan 29 03:04:43 2007
@@ -62,9 +62,10 @@
     /**
      * Creates read only message for delivery to consumers
      */
-    JMSObjectMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data) throws AMQException
+    JMSObjectMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
+                     AMQShortString routingKey, ByteBuffer data) throws AMQException
     {
-        super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, data);
+        super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, exchange, routingKey, data);
     }
 
     public void clearBodyImpl() throws JMSException

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java Mon Jan 29 03:04:43 2007
@@ -23,14 +23,17 @@
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
 
 import javax.jms.JMSException;
 
 public class JMSObjectMessageFactory extends AbstractJMSMessageFactory
 {
-    protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader) throws AMQException
+    protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
+                                               AMQShortString exchange, AMQShortString routingKey, 
+                                               ContentHeaderBody contentHeader) throws AMQException
     {
-        return new JMSObjectMessage(deliveryTag, contentHeader, data);
+        return new JMSObjectMessage(deliveryTag, contentHeader, exchange, routingKey, data);
     }
 
     public AbstractJMSMessage createMessage() throws JMSException

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java Mon Jan 29 03:04:43 2007
@@ -61,10 +61,10 @@
     }
 
 
-    JMSStreamMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data)
-            throws AMQException
+    JMSStreamMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
+                     AMQShortString routingKey, ByteBuffer data) throws AMQException
     {
-        super(messageNbr, contentHeader, data);
+        super(messageNbr, contentHeader, exchange, routingKey, data);
     }
 
     public void reset()

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java Mon Jan 29 03:04:43 2007
@@ -22,16 +22,18 @@
 
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.AMQException;
 
 import javax.jms.JMSException;
 
 public class JMSStreamMessageFactory extends AbstractJMSMessageFactory
 {
-    protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader) throws
-                                                                                                                   AMQException
+    protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
+                                               AMQShortString exchange, AMQShortString routingKey,
+                                               ContentHeaderBody contentHeader) throws AMQException
     {
-        return new JMSStreamMessage(deliveryTag, contentHeader, data);
+        return new JMSStreamMessage(deliveryTag, contentHeader, exchange, routingKey, data);
     }
 
     public AbstractJMSMessage createMessage() throws JMSException

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java Mon Jan 29 03:04:43 2007
@@ -42,7 +42,7 @@
      * This constant represents the name of a property that is set when the message payload is null.
      */
     private static final AMQShortString PAYLOAD_NULL_PROPERTY = new AMQShortString("JMS_QPID_NULL");
-    private static final Charset DEFAULT_CHARSET = Charset.defaultCharset();
+    private static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
 
     public JMSTextMessage() throws JMSException
     {
@@ -56,10 +56,11 @@
         getContentHeaderProperties().setEncoding(encoding);
     }
 
-    JMSTextMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data)
+    JMSTextMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
+                   AMQShortString routingKey, ByteBuffer data)
             throws AMQException
     {
-        super(deliveryTag, contentHeader, data);
+        super(deliveryTag, contentHeader, exchange, routingKey, data);
         contentHeader.setContentType(MIME_TYPE_SHORT_STRING);
         _data = data;
     }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java Mon Jan 29 03:04:43 2007
@@ -24,6 +24,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
 
 import javax.jms.JMSException;
 
@@ -35,8 +36,11 @@
         return new JMSTextMessage();
     }
 
-    protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader) throws AMQException
+    protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
+                                               AMQShortString exchange, AMQShortString routingKey, 
+                                               ContentHeaderBody contentHeader) throws AMQException
     {
-        return new JMSTextMessage(deliveryTag, (BasicContentHeaderProperties) contentHeader.properties, data);
+        return new JMSTextMessage(deliveryTag, (BasicContentHeaderProperties) contentHeader.properties, 
+                                  exchange, routingKey, data);
     }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java Mon Jan 29 03:04:43 2007
@@ -22,6 +22,7 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
 
 import javax.jms.JMSException;
 import java.util.List;
@@ -31,6 +32,7 @@
 {
     AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
                                      ContentHeaderBody contentHeader,
+                                     AMQShortString exchange, AMQShortString routingKey,
                                      List bodies)
         throws JMSException, AMQException;
 

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java Mon Jan 29 03:04:43 2007
@@ -63,6 +63,8 @@
      * @throws JMSException
      */
     public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
+                                            AMQShortString exchange,
+                                            AMQShortString routingKey,
                                             ContentHeaderBody contentHeader,
                                             List bodies) throws AMQException, JMSException
     {
@@ -74,7 +76,7 @@
         }
         else
         {
-            return mf.createMessage(deliveryTag, redelivered, contentHeader, bodies);
+            return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies);
         }
     }
 

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java Mon Jan 29 03:04:43 2007
@@ -24,6 +24,8 @@
 
 import java.util.List;
 import java.util.LinkedList;
+import java.util.ArrayList;
+import java.util.Collections;
 
 /**
  * This class contains everything needed to process a JMS message. It assembles the
@@ -38,27 +40,93 @@
 {
     private long _bytesReceived = 0;
 
-    public BasicDeliverBody deliverBody;
-    public BasicReturnBody bounceBody; // TODO: check change (gustavo)
-    public int channelId;
-    public ContentHeaderBody contentHeader;
+    private final BasicDeliverBody _deliverBody;
+    private final BasicReturnBody _bounceBody; // TODO: check change (gustavo)
+    private final int _channelId;
+    private ContentHeaderBody _contentHeader;
 
     /**
      * List of ContentBody instances. Due to fragmentation you don't know how big this will be in general
      */
-    public List bodies = new LinkedList();
+    private List<ContentBody> _bodies;
+
+    public UnprocessedMessage(int channelId, BasicDeliverBody deliverBody)
+    {
+        _deliverBody = deliverBody;
+        _channelId = channelId;
+        _bounceBody = null;
+    }
+
+
+    public UnprocessedMessage(int channelId, BasicReturnBody bounceBody)
+    {
+        _deliverBody = null;
+        _channelId = channelId;
+        _bounceBody = bounceBody;
+    }
 
     public void receiveBody(ContentBody body) throws UnexpectedBodyReceivedException
     {
-        bodies.add(body);
+
         if (body.payload != null)
         {
-            _bytesReceived += body.payload.remaining();
+            final long payloadSize = body.payload.remaining();
+
+            if(_bodies == null)
+            {
+                if(payloadSize == getContentHeader().bodySize)
+                {
+                    _bodies = Collections.singletonList(body);
+                }
+                else
+                {
+                    _bodies = new ArrayList<ContentBody>();
+                }
+
+            }
+            else
+            {
+                _bodies.add(body);
+            }
+            _bytesReceived += payloadSize;
         }
     }
 
     public boolean isAllBodyDataReceived()
     {
-        return _bytesReceived == contentHeader.bodySize;
+        return _bytesReceived == getContentHeader().bodySize;
+    }
+
+    public BasicDeliverBody getDeliverBody()
+    {
+        return _deliverBody;
+    }
+
+    public BasicReturnBody getBounceBody()
+    {
+        return _bounceBody;
     }
+
+
+    public int getChannelId()
+    {
+        return _channelId;
+    }
+
+
+    public ContentHeaderBody getContentHeader()
+    {
+        return _contentHeader;
+    }
+
+    public void setContentHeader(ContentHeaderBody contentHeader)
+    {
+        this._contentHeader = contentHeader;
+    }
+
+    public List<ContentBody> getBodies()
+    {
+        return _bodies;
+    }
+
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Mon Jan 29 03:04:43 2007
@@ -283,72 +283,87 @@
 
     public void messageReceived(IoSession session, Object message) throws Exception
     {
+        final boolean debug = _logger.isDebugEnabled();
         final long msgNumber = ++_messageReceivedCount;
 
-        if (_logger.isDebugEnabled() && (msgNumber % 1000 == 0))
+        if (debug && (msgNumber % 1000 == 0))
         {
             _logger.debug("Received " + _messageReceivedCount + " protocol messages");
         }
 
         AMQFrame frame = (AMQFrame) message;
 
-        HeartbeatDiagnostics.received(frame.bodyFrame instanceof HeartbeatBody);
+        final AMQBody bodyFrame = frame.getBodyFrame();
+
+        HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
 
-        if (frame.bodyFrame instanceof AMQMethodBody)
+        switch(bodyFrame.getFrameType())
         {
-            if (_logger.isDebugEnabled())
-            {
-                _logger.debug("Method frame received: " + frame);
-            }
+            case AMQMethodBody.TYPE:
 
-            final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.channel, (AMQMethodBody) frame.bodyFrame);
+                if (debug)
+                {
+                    _logger.debug("Method frame received: " + frame);
+                }
 
-            try
-            {
+                final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame);
 
-                boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
-                if (!_frameListeners.isEmpty())
+                try
                 {
-                    Iterator it = _frameListeners.iterator();
-                    while (it.hasNext())
+
+                    boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
+                    if (!_frameListeners.isEmpty())
                     {
-                        final AMQMethodListener listener = (AMQMethodListener) it.next();
-                        wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
+                        Iterator it = _frameListeners.iterator();
+                        while (it.hasNext())
+                        {
+                            final AMQMethodListener listener = (AMQMethodListener) it.next();
+                            wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
+                        }
+                    }
+                    if (!wasAnyoneInterested)
+                    {
+                        throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener.  Listeners:" + _frameListeners);
                     }
                 }
-                if (!wasAnyoneInterested)
-                {
-                    throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener.  Listeners:" + _frameListeners);
-                }
-            }
-            catch (AMQException e)
-            {
-                getStateManager().error(e);
-                if (!_frameListeners.isEmpty())
+                catch (AMQException e)
                 {
-                    Iterator it = _frameListeners.iterator();
-                    while (it.hasNext())
+                    getStateManager().error(e);
+                    if (!_frameListeners.isEmpty())
                     {
-                        final AMQMethodListener listener = (AMQMethodListener) it.next();
-                        listener.error(e);
+                        Iterator it = _frameListeners.iterator();
+                        while (it.hasNext())
+                        {
+                            final AMQMethodListener listener = (AMQMethodListener) it.next();
+                            listener.error(e);
+                        }
                     }
+                    exceptionCaught(session, e);
                 }
-                exceptionCaught(session, e);
-            }
-        }
-        else if (frame.bodyFrame instanceof ContentHeaderBody)
-        {
-            _protocolSession.messageContentHeaderReceived(frame.channel,
-                                                          (ContentHeaderBody) frame.bodyFrame);
-        }
-        else if (frame.bodyFrame instanceof ContentBody)
-        {
-            _protocolSession.messageContentBodyReceived(frame.channel,
-                                                        (ContentBody) frame.bodyFrame);
-        }
-        else if (frame.bodyFrame instanceof HeartbeatBody)
-        {
-            _logger.debug("Received heartbeat");
+                break;
+
+            case ContentHeaderBody.TYPE:
+
+                _protocolSession.messageContentHeaderReceived(frame.getChannel(),
+                                                              (ContentHeaderBody) bodyFrame);
+                break;
+
+            case ContentBody.TYPE:
+
+                _protocolSession.messageContentBodyReceived(frame.getChannel(),
+                                                            (ContentBody) bodyFrame);
+                break;
+            
+            case HeartbeatBody.TYPE:
+
+                if(debug)
+                {
+                    _logger.debug("Received heartbeat");
+                }
+                break;
+
+            default:
+
         }
         _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
     }
@@ -467,7 +482,7 @@
     public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws AMQException
     {
         return writeCommandFrameAndWaitForReply(frame,
-                                                new SpecificMethodFrameListener(frame.channel, responseClass), timeout);
+                                                new SpecificMethodFrameListener(frame.getChannel(), responseClass), timeout);
     }
 
     /**

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Mon Jan 29 03:04:43 2007
@@ -32,7 +32,7 @@
 import org.apache.qpid.client.message.UnexpectedBodyReceivedException;
 import org.apache.qpid.client.message.UnprocessedMessage;
 import org.apache.qpid.framing.*;
-import org.apache.qpid.protocol.AMQProtocolWriter;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.commons.lang.StringUtils;
 
@@ -47,7 +47,7 @@
  * The underlying protocol session is still available but clients should not
  * use it to obtain session attributes.
  */
-public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionList
+public class AMQProtocolSession implements ProtocolVersionList, AMQVersionAwareProtocolSession
 {
 
     protected static final int LAST_WRITE_FUTURE_JOIN_TIMEOUT = 1000 * 60 * 2;
@@ -95,8 +95,7 @@
 
     private byte _protocolMinorVersion;
     private byte _protocolMajorVersion;
-
-
+    private VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(pv[pv.length-1][PROTOCOL_MAJOR],pv[pv.length-1][PROTOCOL_MINOR]);
 
 
     /**
@@ -125,6 +124,7 @@
     {
         _protocolHandler = protocolHandler;
         _minaProtocolSession = protocolSession;
+        _minaProtocolSession.setAttachment(this);
         // properties of the connection are made available to the event handlers
         _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
         //fixme - real value needed
@@ -239,7 +239,7 @@
      */
     public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException
     {
-        _channelId2UnprocessedMsgMap.put(message.channelId, message);
+        _channelId2UnprocessedMsgMap.put(message.getChannelId(), message);
     }
 
     public void messageContentHeaderReceived(int channelId, ContentHeaderBody contentHeader)
@@ -250,11 +250,11 @@
         {
             throw new AMQException("Error: received content header without having received a BasicDeliver frame first");
         }
-        if (msg.contentHeader != null)
+        if (msg.getContentHeader() != null)
         {
             throw new AMQException("Error: received duplicate content header or did not receive correct number of content body frames");
         }
-        msg.contentHeader = contentHeader;
+        msg.setContentHeader(contentHeader);
         if (contentHeader.bodySize == 0)
         {
             deliverMessageToAMQSession(channelId, msg);
@@ -268,7 +268,7 @@
         {
             throw new AMQException("Error: received content body without having received a JMSDeliver frame first");
         }
-        if (msg.contentHeader == null)
+        if (msg.getContentHeader() == null)
         {
             _channelId2UnprocessedMsgMap.remove(channelId);
             throw new AMQException("Error: received content body without having received a ContentHeader frame first");
@@ -465,11 +465,11 @@
         session.confirmConsumerCancelled(consumerTag);
     }
 
-    public void setProtocolVersion(byte versionMajor, byte versionMinor)
+    public void setProtocolVersion(final byte versionMajor, final byte versionMinor)
     {
         _protocolMajorVersion = versionMajor;
         _protocolMinorVersion = versionMinor;
-
+        _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor);        
     }
 
     public byte getProtocolMinorVersion()
@@ -480,6 +480,11 @@
     public byte getProtocolMajorVersion()
     {
         return _protocolMajorVersion;
+    }
+
+    public VersionSpecificRegistry getRegistry()
+    {
+        return _registry;
     }
 
 }