You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2007/09/06 16:29:04 UTC

svn commit: r573282 - in /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client: ./ handler/ message/ protocol/

Author: rajith
Date: Thu Sep  6 07:29:03 2007
New Revision: 573282

URL: http://svn.apache.org/viewvc?rev=573282&view=rev
Log:
Unprocessed message was made abstract and a 0-8 and 0-10 implementation is provided.
The return message extends unprocessed_msg_0_8 as return message is only a 0-8 feature.


Added:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=573282&r1=573281&r2=573282&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Thu Sep  6 07:29:03 2007
@@ -71,79 +71,22 @@
 import org.apache.qpid.client.message.JMSStreamMessage;
 import org.apache.qpid.client.message.JMSTextMessage;
 import org.apache.qpid.client.message.MessageFactoryRegistry;
+import org.apache.qpid.client.message.ReturnMessage;
 import org.apache.qpid.client.message.UnprocessedMessage;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.util.FlowControllingBlockingQueue;
-import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicAckBody;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.BasicConsumeOkBody;
-import org.apache.qpid.framing.BasicRecoverBody;
-import org.apache.qpid.framing.BasicRecoverOkBody;
-import org.apache.qpid.framing.BasicRejectBody;
-import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.framing.ChannelFlowBody;
-import org.apache.qpid.framing.ChannelFlowOkBody;
-import org.apache.qpid.framing.ExchangeBoundBody;
-import org.apache.qpid.framing.ExchangeBoundOkBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
-import org.apache.qpid.framing.ExchangeDeclareOkBody;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.FieldTableFactory;
-import org.apache.qpid.framing.QueueBindBody;
-import org.apache.qpid.framing.QueueBindOkBody;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.QueueDeclareOkBody;
-import org.apache.qpid.framing.QueueDeleteBody;
-import org.apache.qpid.framing.QueueDeleteOkBody;
-import org.apache.qpid.framing.TxCommitBody;
-import org.apache.qpid.framing.TxCommitOkBody;
 import org.apache.qpid.framing.TxRollbackBody;
 import org.apache.qpid.framing.TxRollbackOkBody;
 import org.apache.qpid.jms.Session;
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.url.AMQBindingURL;
 import org.apache.qpid.url.URLSyntaxException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.jms.BytesMessage;
-import javax.jms.Destination;
-import javax.jms.IllegalStateException;
-import javax.jms.InvalidDestinationException;
-import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Queue;
-import javax.jms.QueueBrowser;
-import javax.jms.QueueReceiver;
-import javax.jms.QueueSender;
-import javax.jms.QueueSession;
-import javax.jms.StreamMessage;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.TopicPublisher;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
-import java.io.Serializable;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -1269,19 +1212,17 @@
     {
         if (_logger.isDebugEnabled())
         {
-            _logger.debug("Message["
-                          + ((message.getDeliverBody() == null) ? ("B:" + message.getBounceBody()) : ("D:" + message.getDeliverBody()))
-                          + "] received in session with channel id " + _channelId);
+            _logger.debug("Message[" + message.toString() + "] received in session");
         }
 
-        if (message.getDeliverBody() == null)
+        if (message instanceof ReturnMessage)
         {
             // Return of the bounced message.
-            returnBouncedMessage(message);
+            returnBouncedMessage((ReturnMessage)message);
         }
         else
         {
-            _highestDeliveryTag.set(message.getDeliverBody().deliveryTag);
+            _highestDeliveryTag.set(message.getDeliveryTag());
             _queue.add(message);
         }
     }
@@ -1374,10 +1315,10 @@
 
         if (_logger.isTraceEnabled())
         {
-            _logger.trace("Rejecting Unacked message:" + message.getDeliverBody().deliveryTag);
+            _logger.trace("Rejecting Unacked message:" + message.getDeliveryTag());
         }
 
-        rejectMessage(message.getDeliverBody().deliveryTag, requeue);
+        rejectMessage(message.getDeliveryTag(), requeue);
     }
 
     public void rejectMessage(AbstractJMSMessage message, boolean requeue)
@@ -2320,12 +2261,12 @@
         {
             UnprocessedMessage message = (UnprocessedMessage) messages.next();
 
-            if ((consumerTag == null) || message.getDeliverBody().consumerTag.equals(consumerTag))
+            if ((consumerTag == null) || message.getConsumerTag().equals(consumerTag))
             {
                 if (_logger.isDebugEnabled())
                 {
                     _logger.debug("Removing message(" + System.identityHashCode(message) + ") from _queue DT:"
-                                  + message.getDeliverBody().deliveryTag);
+                                  + message.getDeliveryTag());
                 }
 
                 messages.remove();
@@ -2334,7 +2275,7 @@
 
                 if (_logger.isDebugEnabled())
                 {
-                    _logger.debug("Rejected the message(" + message.getDeliverBody() + ") for consumer :" + consumerTag);
+                    _logger.debug("Rejected the message(" + message.toString() + ") for consumer :" + consumerTag);
                 }
             }
         }
@@ -2363,7 +2304,7 @@
         }
     }
 
-    private void returnBouncedMessage(final UnprocessedMessage message)
+    private void returnBouncedMessage(final ReturnMessage msg)
     {
         _connection.performConnectionTask(new Runnable()
         {
@@ -2373,11 +2314,11 @@
                 {
                     // Bounced message is processed here, away from the mina thread
                     AbstractJMSMessage bouncedMessage =
-                            _messageFactoryRegistry.createMessage(0, false, message.getBounceBody().exchange,
-                                                                  message.getBounceBody().routingKey, message.getContentHeader(), message.getBodies());
+                            _messageFactoryRegistry.createMessage(0, false, msg.getExchange(),
+                                                                  msg.getExchange(), msg.getContentHeader(), msg.getBodies());
 
-                    AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode);
-                    AMQShortString reason = message.getBounceBody().replyText;
+                    AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode());
+                    AMQShortString reason = msg.getReplyText();
                     _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
 
                     // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
@@ -2565,7 +2506,7 @@
                             _lock.wait();
                         }
 
-                        if (message.getDeliverBody().deliveryTag <= _rollbackMark.get())
+                        if (message.getDeliveryTag() <= _rollbackMark.get())
                         {
                             rejectMessage(message, true);
                         }
@@ -2619,10 +2560,11 @@
 
         private void dispatchMessage(UnprocessedMessage message)
         {
-            if (message.getDeliverBody() != null)
-            {
+            //This if block is not needed anymore as bounce messages are handled separately
+            //if (message.getDeliverBody() != null)
+            //{
                 final BasicMessageConsumer consumer =
-                        (BasicMessageConsumer) _consumers.get(message.getDeliverBody().consumerTag);
+                        (BasicMessageConsumer) _consumers.get(new AMQShortString(message.getConsumerTag()));
 
                 if ((consumer == null) || consumer.isClosed())
                 {
@@ -2631,13 +2573,13 @@
                         if (consumer == null)
                         {
                             _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "["
-                                                   + message.getDeliverBody().deliveryTag + "] from queue "
-                                                   + message.getDeliverBody().consumerTag + " )without a handler - rejecting(requeue)...");
+                                                   + message.getDeliveryTag() + "] from queue "
+                                                   + message.getConsumerTag() + " )without a handler - rejecting(requeue)...");
                         }
                         else
                         {
                             _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "["
-                                                   + message.getDeliverBody().deliveryTag + "] from queue " + " consumer("
+                                                   + message.getDeliveryTag() + "] from queue " + " consumer("
                                                    + consumer.debugIdentity() + ") is closed rejecting(requeue)...");
                         }
                     }
@@ -2652,7 +2594,7 @@
                     consumer.notifyMessage(message, _channelId);
                 }
             }
-        }
+        //}
     }
 
     /*public void requestAccess(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write,

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=573282&r1=573281&r2=573282&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Thu Sep  6 07:29:03 2007
@@ -44,7 +44,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public abstract class BasicMessageConsumer extends Closeable implements MessageConsumer
+public abstract class BasicMessageConsumer<H,B> extends Closeable implements MessageConsumer
 {
     private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class);
 
@@ -74,7 +74,7 @@
      */
     private final ArrayBlockingQueue _synchronousQueue;
 
-    private MessageFactoryRegistry _messageFactory;
+    protected MessageFactoryRegistry _messageFactory;
 
     private final AMQSession _session;
 
@@ -543,16 +543,12 @@
 
         if (debug)
         {
-            _logger.debug("notifyMessage called with message number " + messageFrame.getDeliverBody().deliveryTag);
+            _logger.debug("notifyMessage called with message number " + messageFrame.getDeliveryTag());
         }
 
         try
         {
-            AbstractJMSMessage jmsMessage =
-                _messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag,
-                    messageFrame.getDeliverBody().redelivered, messageFrame.getDeliverBody().exchange,
-                    messageFrame.getDeliverBody().routingKey, messageFrame.getContentHeader(), messageFrame.getBodies());
-
+            AbstractJMSMessage jmsMessage = createJMSMessageFromUnprocessedMessage(messageFrame);
             if (debug)
             {
                 _logger.debug("Message is of type: " + jmsMessage.getClass().getName());
@@ -589,6 +585,8 @@
             }
         }
     }
+
+    public abstract AbstractJMSMessage createJMSMessageFromUnprocessedMessage(UnprocessedMessage<H,B> messageFrame)throws Exception;
 
     /**
      * @param jmsMessage this message has already been processed so can't redo preDeliver

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=573282&r1=573281&r2=573282&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java Thu Sep  6 07:29:03 2007
@@ -22,16 +22,20 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.message.AbstractJMSMessage;
 import org.apache.qpid.client.message.MessageFactoryRegistry;
+import org.apache.qpid.client.message.UnprocessedMessage;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.BasicCancelBody;
 import org.apache.qpid.framing.BasicCancelOkBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.FieldTable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class BasicMessageConsumer_0_8 extends BasicMessageConsumer
+public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<ContentHeaderBody,ContentBody>
 {
     protected final Logger _logger = LoggerFactory.getLogger(getClass());
 
@@ -70,5 +74,14 @@
         {
             throw new JMSAMQException("FailoverException interrupted basic cancel.", e);
         }
+    }
+
+    public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(UnprocessedMessage<ContentHeaderBody,ContentBody> messageFrame)throws Exception
+    {
+
+        return _messageFactory.createMessage(messageFrame.getDeliveryTag(),
+            messageFrame.isRedelivered(), messageFrame.getExchange(),
+            messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies());
+
     }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java?rev=573282&r1=573281&r2=573282&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java Thu Sep  6 07:29:03 2007
@@ -21,13 +21,12 @@
 package org.apache.qpid.client.handler;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.message.UnprocessedMessage_0_8;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
 import org.apache.qpid.framing.BasicDeliverBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,7 +44,14 @@
     public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
         throws AMQException
     {
-        final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(), (BasicDeliverBody) evt.getMethod());
+        BasicDeliverBody deliveryBody = (BasicDeliverBody) evt.getMethod();
+        final UnprocessedMessage_0_8 msg = new UnprocessedMessage_0_8(
+                           evt.getChannelId(),
+                           deliveryBody.deliveryTag,
+                           deliveryBody.consumerTag.asString(),
+                           deliveryBody.getExchange(),
+                           deliveryBody.getRoutingKey(),
+                           deliveryBody.getRedelivered());
         _logger.debug("New JmsDeliver method received");
         protocolSession.unprocessedMessageReceived(msg);
     }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java?rev=573282&r1=573281&r2=573282&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java Thu Sep  6 07:29:03 2007
@@ -21,13 +21,12 @@
 package org.apache.qpid.client.handler;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.message.ReturnMessage;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
 import org.apache.qpid.framing.BasicReturnBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,8 +44,14 @@
     public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
         throws AMQException
     {
+        BasicReturnBody returnBody = (BasicReturnBody)evt.getMethod();
         _logger.debug("New JmsBounce method received");
-        final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(), (BasicReturnBody) evt.getMethod());
+        final ReturnMessage msg = new ReturnMessage(evt.getChannelId(),
+                                                    returnBody.getExchange(),
+                                                    returnBody.getRoutingKey(),
+                                                    returnBody.getReplyText(),
+                                                    returnBody.getReplyCode()
+                                                    );
 
         protocolSession.unprocessedMessageReceived(msg);
     }

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java?rev=573282&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java Thu Sep  6 07:29:03 2007
@@ -0,0 +1,26 @@
+package org.apache.qpid.client.message;
+
+import org.apache.qpid.framing.AMQShortString;
+
+public class ReturnMessage extends UnprocessedMessage_0_8
+{
+    final private AMQShortString  _replyText;
+    final private int _replyCode;
+
+    public ReturnMessage(int channelId,AMQShortString exchange,AMQShortString routingKey,AMQShortString replyText,int replyCode)
+    {
+        super(channelId,-1,"",exchange,routingKey,false);
+        _replyText = replyText;
+        _replyCode = replyCode;
+    }
+
+    public int getReplyCode()
+    {
+        return _replyCode;
+    }
+
+    public AMQShortString getReplyText()
+    {
+        return _replyText;
+    }
+}

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java?rev=573282&r1=573281&r2=573282&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java Thu Sep  6 07:29:03 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,14 +20,10 @@
  */
 package org.apache.qpid.client.message;
 
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
-import org.apache.qpid.framing.BasicDeliverBody;
-import org.apache.qpid.framing.BasicReturnBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
+
 
 /**
  * This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and
@@ -36,96 +32,60 @@
  * Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher
  * thread in order to minimise the amount of work done in the MINA dispatcher thread.
  */
-public class UnprocessedMessage
+public abstract class UnprocessedMessage<H,B>
 {
-    private long _bytesReceived = 0;
-
-    private final BasicDeliverBody _deliverBody;
-    private final BasicReturnBody _bounceBody; // TODO: check change (gustavo)
     private final int _channelId;
-    private ContentHeaderBody _contentHeader;
-
-    /** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */
-    private List<ContentBody> _bodies;
+    private final long _deliveryId;
+    private final String _consumerTag;
+    protected AMQShortString _exchange;
+    protected AMQShortString _routingKey;
+    protected boolean _redelivered;
 
-    public UnprocessedMessage(int channelId, BasicDeliverBody deliverBody)
+    public UnprocessedMessage(int channelId,long deliveryId,String consumerTag,AMQShortString exchange,AMQShortString routingKey,boolean redelivered)
     {
-        _deliverBody = deliverBody;
         _channelId = channelId;
-        _bounceBody = null;
+        _deliveryId = deliveryId;
+        _consumerTag = consumerTag;
+        _exchange = exchange;
+        _routingKey = routingKey;
+        _redelivered = redelivered;
     }
 
+    public abstract void receiveBody(B nativeMessageBody);
 
-    public UnprocessedMessage(int channelId, BasicReturnBody bounceBody)
-    {
-        _deliverBody = null;
-        _channelId = channelId;
-        _bounceBody = bounceBody;
-    }
-
-    public void receiveBody(ContentBody body) //throws UnexpectedBodyReceivedException
-    {
-
-        if (body.payload != null)
-        {
-            final long payloadSize = body.payload.remaining();
-
-            if (_bodies == null)
-            {
-                if (payloadSize == getContentHeader().bodySize)
-                {
-                    _bodies = Collections.singletonList(body);
-                }
-                else
-                {
-                    _bodies = new ArrayList<ContentBody>();
-                    _bodies.add(body);
-                }
-
-            }
-            else
-            {
-                _bodies.add(body);
-            }
-            _bytesReceived += payloadSize;
-        }
-    }
+    public abstract void setContentHeader(H nativeMessageHeader);
 
-    public boolean isAllBodyDataReceived()
+    public int getChannelId()
     {
-        return _bytesReceived == getContentHeader().bodySize;
+        return _channelId;
     }
 
-    public BasicDeliverBody getDeliverBody()
+    public long getDeliveryTag()
     {
-        return _deliverBody;
+        return _deliveryId;
     }
 
-    public BasicReturnBody getBounceBody()
+    public String getConsumerTag()
     {
-        return _bounceBody;
+        return _consumerTag;
     }
 
-
-    public int getChannelId()
+    public AMQShortString getExchange()
     {
-        return _channelId;
+        return _exchange;
     }
 
-
-    public ContentHeaderBody getContentHeader()
+    public AMQShortString getRoutingKey()
     {
-        return _contentHeader;
+        return _routingKey;
     }
 
-    public void setContentHeader(ContentHeaderBody contentHeader)
+    public boolean isRedelivered()
     {
-        this._contentHeader = contentHeader;
+        return _redelivered;
     }
 
-    public List<ContentBody> getBodies()
-    {
-        return _bodies;
-    }
+    public abstract List<B> getBodies();
 
+    public abstract H getContentHeader();
 }

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java?rev=573282&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java Thu Sep  6 07:29:03 2007
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpidity.DeliveryProperties;
+import org.apache.qpidity.Struct;
+
+/**
+ * This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and
+ * the content body/ies.
+ *
+ * Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher
+ * thread in order to minimise the amount of work done in the MINA dispatcher thread.
+ */
+public class UnprocessedMessage_0_10 extends UnprocessedMessage<Struct[],ByteBuffer>
+{
+    private Struct[] _headers;
+
+    /** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */
+    private List<ByteBuffer> _bodies = new ArrayList<ByteBuffer>();
+
+    public UnprocessedMessage_0_10(int channelId,long deliveryId,String consumerTag,AMQShortString exchange,AMQShortString routingKey,boolean redelivered)
+    {
+        super(channelId,deliveryId,consumerTag,exchange,routingKey,redelivered);
+    }
+
+    public void receiveBody(ByteBuffer body)
+    {
+
+        _bodies.add(body);
+    }
+
+    public void setContentHeader(Struct[] headers)
+    {
+        this._headers = headers;
+        for(Struct s: headers)
+        {
+            if (s instanceof DeliveryProperties)
+            {
+                DeliveryProperties props = (DeliveryProperties)s;
+                _exchange = new AMQShortString(props.getExchange());
+                _routingKey = new AMQShortString(props.getRoutingKey());
+                _redelivered = props.getRedelivered();
+            }
+        }
+    }
+
+    public Struct[] getContentHeader()
+    {
+        return _headers;
+    }
+
+    public List<ByteBuffer> getBodies()
+    {
+        return _bodies;
+    }
+
+}

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java?rev=573282&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java Thu Sep  6 07:29:03 2007
@@ -0,0 +1,130 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicDeliverBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+/**
+ * This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and
+ * the content body/ies.
+ *
+ * Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher
+ * thread in order to minimise the amount of work done in the MINA dispatcher thread.
+ */
+public class UnprocessedMessage_0_8 extends UnprocessedMessage<ContentHeaderBody,ContentBody>
+{
+    private long _bytesReceived = 0;
+
+    private BasicDeliverBody _deliverBody;
+    private ContentHeaderBody _contentHeader;
+
+    /** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */
+    private List<ContentBody> _bodies;
+
+    public UnprocessedMessage_0_8(int channelId,long deliveryId,String consumerTag,AMQShortString exchange,AMQShortString routingKey,boolean redelivered)
+    {
+        super(channelId,deliveryId,consumerTag,exchange,routingKey,redelivered);
+    }
+
+    public void receiveBody(ContentBody body)
+    {
+
+        if (body.payload != null)
+        {
+            final long payloadSize = body.payload.remaining();
+
+            if (_bodies == null)
+            {
+                if (payloadSize == getContentHeader().bodySize)
+                {
+                    _bodies = Collections.singletonList(body);
+                }
+                else
+                {
+                    _bodies = new ArrayList<ContentBody>();
+                    _bodies.add(body);
+                }
+
+            }
+            else
+            {
+                _bodies.add(body);
+            }
+            _bytesReceived += payloadSize;
+        }
+    }
+
+    public void setMethodBody(BasicDeliverBody deliverBody)
+    {
+        _deliverBody = deliverBody;
+    }
+
+    public void setContentHeader(ContentHeaderBody contentHeader)
+    {
+        this._contentHeader = contentHeader;
+    }
+
+    public boolean isAllBodyDataReceived()
+    {
+        return _bytesReceived == getContentHeader().bodySize;
+    }
+
+    public BasicDeliverBody getDeliverBody()
+    {
+        return _deliverBody;
+    }
+
+    public ContentHeaderBody getContentHeader()
+    {
+        return _contentHeader;
+    }
+
+    public List<ContentBody> getBodies()
+    {
+        return _bodies;
+    }
+
+    public String toString()
+    {
+        StringBuilder buf = new StringBuilder();
+        buf.append("Channel Id : " + this.getChannelId());
+        if (_contentHeader != null)
+        {
+          buf.append("ContentHeader " + _contentHeader);
+        }
+        if(_deliverBody != null)
+        {
+            buf.append("Delivery tag " + _deliverBody.deliveryTag);
+            buf.append("Consumer tag " + _deliverBody.consumerTag);
+            buf.append("Deliver Body " + _deliverBody);
+        }
+
+        return buf.toString();
+    }
+
+}

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=573282&r1=573281&r2=573282&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Thu Sep  6 07:29:03 2007
@@ -32,7 +32,9 @@
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.ConnectionTuneParameters;
 // import org.apache.qpid.client.message.UnexpectedBodyReceivedException;
+import org.apache.qpid.client.message.ReturnMessage;
 import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.message.UnprocessedMessage_0_8;
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.framing.AMQShortString;
@@ -93,7 +95,7 @@
      * Maps from a channel id to an unprocessed message. This is used to tie together the JmsDeliverBody (which arrives
      * first) with the subsequent content header and content bodies.
      */
-    protected ConcurrentMap _channelId2UnprocessedMsgMap = new ConcurrentHashMap();
+    protected ConcurrentMap<Integer,UnprocessedMessage_0_8> _channelId2UnprocessedMsgMap = new ConcurrentHashMap<Integer,UnprocessedMessage_0_8>();
 
     /** Counter to ensure unique queue names */
     protected int _queueId = 1;
@@ -228,14 +230,14 @@
      *
      * @throws AMQException if this was not expected
      */
-    public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException
+    public void unprocessedMessageReceived(UnprocessedMessage_0_8 message) throws AMQException
     {
         _channelId2UnprocessedMsgMap.put(message.getChannelId(), message);
     }
 
     public void messageContentHeaderReceived(int channelId, ContentHeaderBody contentHeader) throws AMQException
     {
-        UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId);
+        UnprocessedMessage_0_8 msg = (UnprocessedMessage_0_8) _channelId2UnprocessedMsgMap.get(channelId);
         if (msg == null)
         {
             throw new AMQException(null, "Error: received content header without having received a BasicDeliver frame first", null);
@@ -255,7 +257,7 @@
 
     public void messageContentBodyReceived(int channelId, ContentBody contentBody) throws AMQException
     {
-        UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId);
+        UnprocessedMessage_0_8 msg = _channelId2UnprocessedMsgMap.get(channelId);
         if (msg == null)
         {
             throw new AMQException(null, "Error: received content body without having received a JMSDeliver frame first", null);