You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2007/09/13 14:35:51 UTC

svn commit: r575289 - in /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client: ./ message/

Author: arnaudsimon
Date: Thu Sep 13 05:35:48 2007
New Revision: 575289

URL: http://svn.apache.org/viewvc?rev=575289&view=rev
Log:
updated message hierarchy for using 0_10 messages

Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=575289&r1=575288&r2=575289&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Thu Sep 13 05:35:48 2007
@@ -112,7 +112,7 @@
                     int defaultPrefetchLow)
     {
 
-        this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefault010Registry(),
+        this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(),
              defaultPrefetchHigh, defaultPrefetchLow);
     }
 

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=575289&r1=575288&r2=575289&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Thu Sep 13 05:35:48 2007
@@ -100,7 +100,7 @@
     AMQSession_0_8(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh,
                int defaultPrefetchLow)
     {
-        this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefault08Registry(), defaultPrefetchHigh,
+        this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh,
              defaultPrefetchLow);
     }
 

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=575289&r1=575288&r2=575289&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Thu Sep 13 05:35:48 2007
@@ -39,18 +39,18 @@
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.jms.MessageConsumer;
 import org.apache.qpid.jms.Session;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public abstract class BasicMessageConsumer<H,B> extends Closeable implements MessageConsumer
+public abstract class BasicMessageConsumer<H, B> extends Closeable implements MessageConsumer
 {
     private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class);
 
-    /** The connection being used by this consumer */
+    /**
+     * The connection being used by this consumer
+     */
     private AMQConnection _connection;
 
     private String _messageSelector;
@@ -59,15 +59,23 @@
 
     private AMQDestination _destination;
 
-    /** When true indicates that a blocking receive call is in progress */
+    /**
+     * When true indicates that a blocking receive call is in progress
+     */
     private final AtomicBoolean _receiving = new AtomicBoolean(false);
-    /** Holds an atomic reference to the listener installed. */
+    /**
+     * Holds an atomic reference to the listener installed.
+     */
     private final AtomicReference<MessageListener> _messageListener = new AtomicReference<MessageListener>();
 
-    /** The consumer tag allows us to close the consumer by sending a jmsCancel method to the broker */
+    /**
+     * The consumer tag allows us to close the consumer by sending a jmsCancel method to the broker
+     */
     protected AMQShortString _consumerTag;
 
-    /** We need to know the channel id when constructing frames */
+    /**
+     * We need to know the channel id when constructing frames
+     */
     protected int _channelId;
 
     /**
@@ -82,7 +90,9 @@
 
     protected AMQProtocolHandler _protocolHandler;
 
-    /** We need to store the "raw" field table so that we can resubscribe in the event of failover being required */
+    /**
+     * We need to store the "raw" field table so that we can resubscribe in the event of failover being required
+     */
     private FieldTable _rawSelectorFieldTable;
 
     /**
@@ -97,7 +107,9 @@
      */
     private int _prefetchLow;
 
-    /** We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover */
+    /**
+     * We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover
+     */
     private boolean _exclusive;
 
     /**
@@ -107,7 +119,9 @@
      */
     private int _acknowledgeMode;
 
-    /** Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode */
+    /**
+     * Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode
+     */
     private int _outstanding;
 
     /**
@@ -118,7 +132,9 @@
 
     private ConcurrentLinkedQueue<Long> _unacknowledgedDeliveryTags = new ConcurrentLinkedQueue<Long>();
 
-    /** List of tags delievered, The last of which which should be acknowledged on commit in transaction mode. */
+    /**
+     * List of tags delievered, The last of which which should be acknowledged on commit in transaction mode.
+     */
     private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new ConcurrentLinkedQueue<Long>();
 
     /**
@@ -138,9 +154,10 @@
     private List<StackTraceElement> _closedStack = null;
 
     protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination,
-        String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
-        AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
-        boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
+                                   String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
+                                   AMQSession session, AMQProtocolHandler protocolHandler,
+                                   FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
+                                   boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
     {
         _channelId = channelId;
         _connection = connection;
@@ -216,8 +233,8 @@
 
             if (_logger.isDebugEnabled())
             {
-                _logger.debug("Session stopped : Message listener(" + messageListener + ") set for destination "
-                    + _destination);
+                _logger.debug(
+                        "Session stopped : Message listener(" + messageListener + ") set for destination " + _destination);
             }
         }
         else
@@ -418,9 +435,7 @@
      * it by throwing it (if an exception) or returning it (in any other case).
      *
      * @param o
-     *
      * @return a message only if o is a Message
-     *
      * @throws JMSException if the argument is a throwable. If it is a JMSException it is rethrown as is, but if not a
      *                      JMSException is created with the linked exception set appropriately
      */
@@ -465,8 +480,8 @@
                 {
                     if (_closedStack != null)
                     {
-                        _logger.trace(_consumerTag + " close():"
-                            + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
+                        _logger.trace(_consumerTag + " close():" + Arrays.asList(Thread.currentThread().getStackTrace())
+                                .subList(3, 6));
                         _logger.trace(_consumerTag + " previously:" + _closedStack.toString());
                     }
                     else
@@ -518,8 +533,8 @@
             {
                 if (_closedStack != null)
                 {
-                    _logger.trace(_consumerTag + " markClosed():"
-                        + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8));
+                    _logger.trace(_consumerTag + " markClosed():" + Arrays
+                            .asList(Thread.currentThread().getStackTrace()).subList(3, 8));
                     _logger.trace(_consumerTag + " previously:" + _closedStack.toString());
                 }
                 else
@@ -588,14 +603,9 @@
         }
     }
 
-    public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(UnprocessedMessage<ContentHeaderBody, ContentBody> messageFrame)throws Exception
-     {
+    public abstract AbstractJMSMessage createJMSMessageFromUnprocessedMessage(UnprocessedMessage<H, B> messageFrame)
+            throws Exception;
 
-        return _messageFactory.createMessage(messageFrame.getDeliveryTag(),
-            messageFrame.isRedelivered(), messageFrame.getExchange(),
-            messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies());
-
-    }
 
     /**
      * @param jmsMessage this message has already been processed so can't redo preDeliver
@@ -643,77 +653,79 @@
         switch (_acknowledgeMode)
         {
 
-        case Session.PRE_ACKNOWLEDGE:
-            _session.acknowledgeMessage(msg.getDeliveryTag(), false);
-            break;
+            case Session.PRE_ACKNOWLEDGE:
+                _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+                break;
 
-        case Session.CLIENT_ACKNOWLEDGE:
-            // we set the session so that when the user calls acknowledge() it can call the method on session
-            // to send out the appropriate frame
-            msg.setAMQSession(_session);
-            break;
+            case Session.CLIENT_ACKNOWLEDGE:
+                // we set the session so that when the user calls acknowledge() it can call the method on session
+                // to send out the appropriate frame
+                msg.setAMQSession(_session);
+                break;
         }
     }
 
-     void postDeliver(AbstractJMSMessage msg) throws JMSException
+    void postDeliver(AbstractJMSMessage msg) throws JMSException
     {
         msg.setJMSDestination(_destination);
         switch (_acknowledgeMode)
         {
 
-        case Session.CLIENT_ACKNOWLEDGE:
-            if (isNoConsume())
-            {
-                _session.acknowledgeMessage(msg.getDeliveryTag(), false);
-            }
+            case Session.CLIENT_ACKNOWLEDGE:
+                if (isNoConsume())
+                {
+                    _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+                }
 
-            break;
+                break;
 
-        case Session.DUPS_OK_ACKNOWLEDGE:
-            if (++_outstanding >= _prefetchHigh)
-            {
-                _dups_ok_acknowledge_send = true;
-            }
+            case Session.DUPS_OK_ACKNOWLEDGE:
+                if (++_outstanding >= _prefetchHigh)
+                {
+                    _dups_ok_acknowledge_send = true;
+                }
 
-            if (_outstanding <= _prefetchLow)
-            {
-                _dups_ok_acknowledge_send = false;
-            }
+                if (_outstanding <= _prefetchLow)
+                {
+                    _dups_ok_acknowledge_send = false;
+                }
 
-            if (_dups_ok_acknowledge_send)
-            {
-                if (!_session.isInRecovery())
+                if (_dups_ok_acknowledge_send)
                 {
-                    _session.acknowledgeMessage(msg.getDeliveryTag(), true);
+                    if (!_session.isInRecovery())
+                    {
+                        _session.acknowledgeMessage(msg.getDeliveryTag(), true);
+                    }
                 }
-            }
 
-            break;
+                break;
 
-        case Session.AUTO_ACKNOWLEDGE:
-            // we do not auto ack a message if the application code called recover()
-            if (!_session.isInRecovery())
-            {
-                _session.acknowledgeMessage(msg.getDeliveryTag(), false);
-            }
+            case Session.AUTO_ACKNOWLEDGE:
+                // we do not auto ack a message if the application code called recover()
+                if (!_session.isInRecovery())
+                {
+                    _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+                }
 
-            break;
+                break;
 
-        case Session.SESSION_TRANSACTED:
-            if (isNoConsume())
-            {
-                _session.acknowledgeMessage(msg.getDeliveryTag(), false);
-            }
-            else
-            {
-                _receivedDeliveryTags.add(msg.getDeliveryTag());
-            }
+            case Session.SESSION_TRANSACTED:
+                if (isNoConsume())
+                {
+                    _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+                }
+                else
+                {
+                    _receivedDeliveryTags.add(msg.getDeliveryTag());
+                }
 
-            break;
+                break;
         }
     }
 
-    /** Acknowledge up to last message delivered (if any). Used when commiting. */
+    /**
+     * Acknowledge up to last message delivered (if any). Used when commiting.
+     */
     void acknowledgeLastDelivered()
     {
         if (!_receivedDeliveryTags.isEmpty())
@@ -740,8 +752,8 @@
             {
                 if (_closedStack != null)
                 {
-                    _logger.trace(_consumerTag + " notifyError():"
-                        + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8));
+                    _logger.trace(_consumerTag + " notifyError():" + Arrays
+                            .asList(Thread.currentThread().getStackTrace()).subList(3, 8));
                     _logger.trace(_consumerTag + " previously" + _closedStack.toString());
                 }
                 else
@@ -819,7 +831,9 @@
         }
     }
 
-    /** Called on recovery to reset the list of delivery tags */
+    /**
+     * Called on recovery to reset the list of delivery tags
+     */
     public void clearUnackedMessages()
     {
         _unacknowledgedDeliveryTags.clear();
@@ -860,8 +874,8 @@
         {
             if (_logger.isDebugEnabled())
             {
-                _logger.debug("Rejecting the messages(" + _receivedDeliveryTags.size() + ") in _receivedDTs (RQ)"
-                    + "for consumer with tag:" + _consumerTag);
+                _logger.debug("Rejecting the messages(" + _receivedDeliveryTags
+                        .size() + ") in _receivedDTs (RQ)" + "for consumer with tag:" + _consumerTag);
             }
 
             Long tag = _receivedDeliveryTags.poll();
@@ -890,8 +904,8 @@
         {
             if (_logger.isDebugEnabled())
             {
-                _logger.debug("Rejecting the messages(" + _synchronousQueue.size() + ") in _syncQueue (PRQ)"
-                    + "for consumer with tag:" + _consumerTag);
+                _logger.debug("Rejecting the messages(" + _synchronousQueue
+                        .size() + ") in _syncQueue (PRQ)" + "for consumer with tag:" + _consumerTag);
             }
 
             Iterator iterator = _synchronousQueue.iterator();
@@ -914,8 +928,8 @@
                 }
                 else
                 {
-                    _logger.error("Queue contained a :" + o.getClass()
-                        + " unable to reject as it is not an AbstractJMSMessage. Will be cleared");
+                    _logger.error("Queue contained a :" + o
+                            .getClass() + " unable to reject as it is not an AbstractJMSMessage. Will be cleared");
                     iterator.remove();
                 }
             }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=575289&r1=575288&r2=575289&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Thu Sep 13 05:35:48 2007
@@ -32,11 +32,12 @@
 
 import javax.jms.JMSException;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 /**
  * This is a 0.10 message consumer.
  */
-public class BasicMessageConsumer_0_10 extends BasicMessageConsumer
+public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], ByteBuffer>
         implements org.apache.qpidity.client.util.MessageListener
 {
     /**
@@ -108,5 +109,12 @@
         ((AMQSession_0_10) getSession()).addMessageTag(msg.getDeliveryTag());
         super.postDeliver(msg);
     }
-  
+
+
+    public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(UnprocessedMessage<Struct[], ByteBuffer>  messageFrame) throws Exception
+    {
+        return _messageFactory.createMessage(messageFrame.getDeliveryTag(),
+            messageFrame.isRedelivered(), messageFrame.getExchange(),
+            messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies());
+    }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=575289&r1=575288&r2=575289&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java Thu Sep 13 05:35:48 2007
@@ -75,4 +75,13 @@
             throw new JMSAMQException("FailoverException interrupted basic cancel.", e);
         }
     }
+
+     public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(UnprocessedMessage<ContentHeaderBody, ContentBody> messageFrame)throws Exception
+     {
+
+        return _messageFactory.createMessage(messageFrame.getDeliveryTag(),
+            messageFrame.isRedelivered(), messageFrame.getExchange(),
+            messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies());
+
+    }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=575289&r1=575288&r2=575289&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java Thu Sep 13 05:35:48 2007
@@ -21,19 +21,19 @@
 import org.apache.qpid.client.message.AbstractJMSMessage;
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.ExchangeDeclareBody;
-import org.apache.qpidity.jms.message.MessageImpl;
-import org.apache.qpidity.jms.message.MessageHelper;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.url.AMQBindingURL;
+import org.apache.qpid.url.URLSyntaxException;
 import org.apache.qpidity.jms.ExceptionHelper;
-import org.apache.qpidity.QpidException;
+import org.apache.qpidity.client.util.ByteBufferMessage;
+import org.apache.qpidity.ReplyTo;
 
 import javax.jms.Message;
 import javax.jms.JMSException;
-import java.util.UUID;
 import java.io.IOException;
 
 /**
- *
- *  This is a 0_10 message producer. 
+ * This is a 0_10 message producer.
  */
 public class BasicMessageProducer_0_10 extends BasicMessageProducer
 {
@@ -66,85 +66,79 @@
 
     //--- Overwritten methods
 
-
     /**
      * Sends a message to a given destination
-     * We will always convert the received message
      */
     public void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
                             int deliveryMode, int priority, long timeToLive, boolean mandatory, boolean immediate,
                             boolean wait) throws JMSException
     {
-        // Only get current time if required
-        long currentTime = Long.MIN_VALUE;
-        if (!((timeToLive == 0) && _disableTimestamps))
-        {
-            currentTime = System.currentTimeMillis();
-        }
-        // the messae UID
-        String uid = (getDisableMessageID()) ? "MSG_ID_DISABLED" : UUID.randomUUID().toString();
-        MessageImpl qpidMessage;
-        // check that the message is not a foreign one
+        message.prepareForSending();
+        org.apache.qpidity.api.Message qpidityMessage = new ByteBufferMessage();
+        // set the payload
         try
         {
-            qpidMessage = (MessageImpl) origMessage;
+            qpidityMessage.appendData(message.getData().buf());
+        }
+        catch (IOException e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
         }
-        catch (ClassCastException cce)
+        // set the delivery properties
+        if (!_disableTimestamps)
         {
-            // this is a foreign message
-            qpidMessage = MessageHelper.transformMessage(origMessage);
-            // set message's properties in case they are queried after send.
-            origMessage.setJMSDestination(destination);
-            origMessage.setJMSDeliveryMode(deliveryMode);
-            origMessage.setJMSPriority(priority);
-            origMessage.setJMSMessageID(uid);
-            if (timeToLive != 0)
+            final long currentTime = System.currentTimeMillis();
+            qpidityMessage.getDeliveryProperties().setTimestamp(currentTime);
+            if (timeToLive > 0)
             {
-                origMessage.setJMSExpiration(timeToLive + currentTime);
-                _logger.debug("Setting JMSExpiration:" + message.getJMSExpiration());
+                qpidityMessage.getDeliveryProperties().setExpiration(currentTime + timeToLive);
             }
             else
             {
-                origMessage.setJMSExpiration(timeToLive);
+                qpidityMessage.getDeliveryProperties().setExpiration(0);
             }
-            origMessage.setJMSTimestamp(currentTime);
         }
-        // set the message properties
-        qpidMessage.setJMSDestination(destination);
-        qpidMessage.setJMSMessageID(uid);
-        qpidMessage.setJMSDeliveryMode(deliveryMode);
-        qpidMessage.setJMSPriority(priority);
-        if (timeToLive != 0)
+        qpidityMessage.getDeliveryProperties().setDeliveryMode((byte) deliveryMode);
+        qpidityMessage.getDeliveryProperties().setPriority((byte) priority);
+        qpidityMessage.getDeliveryProperties().setExchange(destination.getExchangeName().toString());
+        qpidityMessage.getDeliveryProperties().setRoutingKey(destination.getRoutingKey().toString());
+        BasicContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties();
+        // set the application properties
+        qpidityMessage.getMessageProperties().setContentType(contentHeaderProperties.getContentType().toString());
+        qpidityMessage.getMessageProperties().setCorrelationId(contentHeaderProperties.getCorrelationId().toString());
+        String replyToURL = contentHeaderProperties.getReplyToAsString();
+        if (replyToURL != null)
         {
-            qpidMessage.setJMSExpiration(timeToLive + currentTime);
-        }
-        else
-        {
-            qpidMessage.setJMSExpiration(timeToLive);
-        }
-        qpidMessage.setJMSTimestamp(currentTime);
-        qpidMessage.setRoutingKey(destination.getDestinationName().toString());
-        qpidMessage.setExchangeName(destination.getExchangeName().toString());
-        // call beforeMessageDispatch
-        try
-        {
-            qpidMessage.beforeMessageDispatch();
+            AMQBindingURL dest;
+            try
+            {
+                dest = new AMQBindingURL(replyToURL);
+            }
+            catch (URLSyntaxException e)
+            {
+                throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+            }
+            qpidityMessage.getMessageProperties()
+                    .setReplyTo(new ReplyTo(dest.getExchangeName().toString(), dest.getRoutingKey().toString()));
         }
-        catch (QpidException e)
+        if (contentHeaderProperties.getHeaders() != null)
         {
-            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+            // todo use the new fieldTable
+            qpidityMessage.getMessageProperties().setApplicationHeaders(null);
         }
+        // send the message 
         try
         {
-            ((AMQSession_0_10) getSession()).getQpidSession().messageTransfer(qpidMessage.getExchangeName(),
-                                                                              qpidMessage.getQpidityMessage(),
+            ((AMQSession_0_10) getSession()).getQpidSession().messageTransfer(destination.getExchangeName().toString(),
+                                                                              qpidityMessage,
                                                                               org.apache.qpidity.client.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
                                                                               org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE);
         }
         catch (IOException e)
         {
             throw ExceptionHelper.convertQpidExceptionToJMSException(e);
-        }       
+        }
+
     }
 }
 

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java?rev=575289&r1=575288&r2=575289&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java Thu Sep 13 05:35:48 2007
@@ -72,11 +72,11 @@
         _data.setAutoExpand(true);
     }
 
-    AbstractBytesMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
+    AbstractBytesMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
         AMQShortString routingKey, ByteBuffer data) throws AMQException
     {
         // TODO: this casting is ugly. Need to review whole ContentHeaderBody idea
-        super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, exchange, routingKey, data);
+        super(messageNbr, contentHeader, exchange, routingKey, data);
         getContentHeaderProperties().setContentType(getMimeTypeAsShortString());
     }
 

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java?rev=575289&r1=575288&r2=575289&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java Thu Sep 13 05:35:48 2007
@@ -34,6 +34,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
 
 /**
  * @author Apache Software Foundation
@@ -86,7 +87,7 @@
     }
 
 
-    AbstractBytesTypedMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
+    AbstractBytesTypedMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
                               AMQShortString routingKey, ByteBuffer data) throws AMQException
     {
         super(messageNbr, contentHeader, exchange, routingKey, data);

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?rev=575289&r1=575288&r2=575289&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java Thu Sep 13 05:35:48 2007
@@ -26,6 +26,10 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpidity.Struct;
+import org.apache.qpidity.MessageProperties;
+import org.apache.qpidity.DeliveryProperties;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,10 +44,12 @@
     private static final Logger _logger = LoggerFactory.getLogger(AbstractJMSMessageFactory.class);
 
     protected abstract AbstractJMSMessage createMessage(long messageNbr, ByteBuffer data, AMQShortString exchange,
-        AMQShortString routingKey, ContentHeaderBody contentHeader) throws AMQException;
+                                                        AMQShortString routingKey,
+                                                        BasicContentHeaderProperties contentHeader) throws AMQException;
 
-    protected AbstractJMSMessage createMessageWithBody(long messageNbr, ContentHeaderBody contentHeader,
-        AMQShortString exchange, AMQShortString routingKey, List bodies) throws AMQException
+    protected AbstractJMSMessage create08MessageWithBody(long messageNbr, ContentHeaderBody contentHeader,
+                                                         AMQShortString exchange, AMQShortString routingKey,
+                                                         List bodies) throws AMQException
     {
         ByteBuffer data;
         final boolean debug = _logger.isDebugEnabled();
@@ -62,8 +68,8 @@
         {
             if (debug)
             {
-                _logger.debug("Fragmented message body (" + bodies.size() + " frames, bodySize=" + contentHeader.bodySize
-                    + ")");
+                _logger.debug("Fragmented message body (" + bodies
+                        .size() + " frames, bodySize=" + contentHeader.bodySize + ")");
             }
 
             data = ByteBuffer.allocate((int) contentHeader.bodySize); // XXX: Is cast a problem?
@@ -84,17 +90,71 @@
 
         if (debug)
         {
-            _logger.debug("Creating message from buffer with position=" + data.position() + " and remaining="
-                + data.remaining());
+            _logger.debug("Creating message from buffer with position=" + data.position() + " and remaining=" + data
+                    .remaining());
         }
 
-        return createMessage(messageNbr, data, exchange, routingKey, contentHeader);
+        return createMessage(messageNbr, data, exchange, routingKey,
+                             (BasicContentHeaderProperties) contentHeader.properties);
     }
 
+    protected AbstractJMSMessage create010MessageWithBody(long messageNbr, Struct[] contentHeader,
+                                                          AMQShortString exchange, AMQShortString routingKey,
+                                                          List bodies) throws AMQException
+    {
+        ByteBuffer data;
+        final boolean debug = _logger.isDebugEnabled();
+
+        // we optimise the non-fragmented case to avoid copying
+        if ((bodies != null))
+        {
+            data = ByteBuffer.wrap((java.nio.ByteBuffer) bodies.get(0));
+        }
+        else // bodies == null
+        {
+            data = ByteBuffer.allocate(0);
+        }
+
+        if (debug)
+        {
+            _logger.debug("Creating message from buffer with position=" + data.position() + " and remaining=" + data
+                    .remaining());
+        }
+        BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+        // set the properties of this message
+        MessageProperties mprop = (MessageProperties) contentHeader[0];
+        DeliveryProperties devprop = (DeliveryProperties) contentHeader[1];
+        props.setContentType(mprop.getContentType());
+        props.setCorrelationId(mprop.getCorrelationId());
+        props.setEncoding(mprop.getContentEncoding());
+        props.setExpiration(devprop.getExpiration());
+        // todo update when fieldtable is used props.setHeaders(mprop.getApplicationHeaders());
+        props.setMessageId(mprop.getMessageId());
+        props.setPriority((byte) devprop.getPriority());
+        // todo we need to match the reply to props.setReplyTo(new AMQShortString(mprop.getReplyTo()));
+        props.setTimestamp(devprop.getTimestamp());
+        props.setType(mprop.getType());
+        props.setUserId(mprop.getUserId());
+        return createMessage(messageNbr, data, exchange, routingKey, props);
+    }
+
+
     public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, ContentHeaderBody contentHeader,
-        AMQShortString exchange, AMQShortString routingKey, List bodies) throws JMSException, AMQException
+                                            AMQShortString exchange, AMQShortString routingKey, List bodies)
+            throws JMSException, AMQException
+    {
+        final AbstractJMSMessage msg = create08MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies);
+        msg.setJMSRedelivered(redelivered);
+
+        return msg;
+    }
+
+    public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, Struct[] contentHeader,
+                                            AMQShortString exchange, AMQShortString routingKey, List bodies)
+            throws JMSException, AMQException
     {
-        final AbstractJMSMessage msg = createMessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies);
+        final AbstractJMSMessage msg =
+                create010MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies);
         msg.setJMSRedelivered(redelivered);
 
         return msg;

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java?rev=575289&r1=575288&r2=575289&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java Thu Sep 13 05:35:48 2007
@@ -34,6 +34,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
 
 public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessage
 {
@@ -57,7 +58,7 @@
         super(data); // this instanties a content header
     }
 
-    JMSBytesMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
+    JMSBytesMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
                     AMQShortString routingKey, ByteBuffer data) throws AMQException
     {
         super(messageNbr, contentHeader, exchange, routingKey, data);

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java?rev=575289&r1=575288&r2=575289&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java Thu Sep 13 05:35:48 2007
@@ -26,12 +26,13 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
 
 public class JMSBytesMessageFactory extends AbstractJMSMessageFactory
 {
     protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
                                                AMQShortString exchange, AMQShortString routingKey,
-                                               ContentHeaderBody contentHeader) throws AMQException
+                                               BasicContentHeaderProperties contentHeader) throws AMQException
     {
         return new JMSBytesMessage(deliveryTag, contentHeader, exchange, routingKey, data);
     }
@@ -40,4 +41,7 @@
     {
         return new JMSBytesMessage();
     }
+
+    // 0_10 specific
+
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java?rev=575289&r1=575288&r2=575289&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java Thu Sep 13 05:35:48 2007
@@ -25,6 +25,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,7 +59,7 @@
         populateMapFromData();
     }
 
-    JMSMapMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, AMQShortString routingKey,
+    JMSMapMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange, AMQShortString routingKey,
         ByteBuffer data) throws AMQException
     {
         super(messageNbr, contentHeader, exchange, routingKey, data);

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java?rev=575289&r1=575288&r2=575289&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java Thu Sep 13 05:35:48 2007
@@ -26,6 +26,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
 
 public class JMSMapMessageFactory extends AbstractJMSMessageFactory
 {
@@ -36,8 +37,9 @@
 
     protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
                                                AMQShortString exchange, AMQShortString routingKey, 
-                                               ContentHeaderBody contentHeader) throws AMQException
+                                               BasicContentHeaderProperties contentHeader) throws AMQException
     {
         return new JMSMapMessage(deliveryTag, contentHeader, exchange, routingKey, data);
+
     }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java?rev=575289&r1=575288&r2=575289&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java Thu Sep 13 05:35:48 2007
@@ -69,10 +69,10 @@
     /**
      * Creates read only message for delivery to consumers
      */
-    JMSObjectMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, AMQShortString routingKey,
+    JMSObjectMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange, AMQShortString routingKey,
         ByteBuffer data) throws AMQException
     {
-        super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, exchange, routingKey, data);
+        super(messageNbr, contentHeader, exchange, routingKey, data);
     }
 
     public void clearBodyImpl() throws JMSException

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java?rev=575289&r1=575288&r2=575289&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java Thu Sep 13 05:35:48 2007
@@ -26,12 +26,13 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
 
 public class JMSObjectMessageFactory extends AbstractJMSMessageFactory
 {
     protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
                                                AMQShortString exchange, AMQShortString routingKey, 
-                                               ContentHeaderBody contentHeader) throws AMQException
+                                               BasicContentHeaderProperties contentHeader) throws AMQException
     {
         return new JMSObjectMessage(deliveryTag, contentHeader, exchange, routingKey, data);
     }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java?rev=575289&r1=575288&r2=575289&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java Thu Sep 13 05:35:48 2007
@@ -27,6 +27,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
 
 /**
  * @author Apache Software Foundation
@@ -60,7 +61,7 @@
     }
 
 
-    JMSStreamMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
+    JMSStreamMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
                      AMQShortString routingKey, ByteBuffer data) throws AMQException
     {
         super(messageNbr, contentHeader, exchange, routingKey, data);

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java?rev=575289&r1=575288&r2=575289&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java Thu Sep 13 05:35:48 2007
@@ -26,16 +26,16 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
 
 public class JMSStreamMessageFactory extends AbstractJMSMessageFactory
 {
     protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
                                                AMQShortString exchange, AMQShortString routingKey,
-                                               ContentHeaderBody contentHeader) throws AMQException
+                                               BasicContentHeaderProperties contentHeader) throws AMQException
     {
         return new JMSStreamMessage(deliveryTag, contentHeader, exchange, routingKey, data);
     }
-
     public AbstractJMSMessage createMessage() throws JMSException
     {
         return new JMSStreamMessage();

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java?rev=575289&r1=575288&r2=575289&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java Thu Sep 13 05:35:48 2007
@@ -38,9 +38,9 @@
 
     protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
                                                AMQShortString exchange, AMQShortString routingKey, 
-                                               ContentHeaderBody contentHeader) throws AMQException
+                                               BasicContentHeaderProperties contentHeader) throws AMQException
     {
-        return new JMSTextMessage(deliveryTag, (BasicContentHeaderProperties) contentHeader.properties, 
+        return new JMSTextMessage(deliveryTag,  contentHeader, 
                                   exchange, routingKey, data);
     }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java?rev=575289&r1=575288&r2=575289&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java Thu Sep 13 05:35:48 2007
@@ -27,12 +27,19 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpidity.Struct;
 
 
 public interface MessageFactory
 {
     AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
                                      ContentHeaderBody contentHeader,
+                                     AMQShortString exchange, AMQShortString routingKey,
+                                     List bodies)
+        throws JMSException, AMQException;
+
+     AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
+                                     Struct[] contentHeader,
                                      AMQShortString exchange, AMQShortString routingKey,
                                      List bodies)
         throws JMSException, AMQException;

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java?rev=575289&r1=575288&r2=575289&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java Thu Sep 13 05:35:48 2007
@@ -30,18 +30,29 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpidity.Struct;
+import org.apache.qpidity.DeliveryProperties;
+import org.apache.qpidity.MessageProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class MessageFactoryRegistry
 {
+    /**
+     * This class logger
+     */
+    protected final Logger _logger = LoggerFactory.getLogger(getClass());
+
     private final Map<String, MessageFactory> _mimeStringToFactoryMap = new HashMap<String, MessageFactory>();
     private final Map<AMQShortString, MessageFactory> _mimeShortStringToFactoryMap =
-        new HashMap<AMQShortString, MessageFactory>();
+            new HashMap<AMQShortString, MessageFactory>();
 
     /**
      * Construct a new registry with the default message factories registered
+     *
      * @return a message factory registry
      */
-    public static MessageFactoryRegistry newDefault08Registry()
+    public static MessageFactoryRegistry newDefaultRegistry()
     {
         MessageFactoryRegistry mf = new MessageFactoryRegistry();
         mf.registerFactory(JMSMapMessage.MIME_TYPE, new JMSMapMessageFactory());
@@ -55,24 +66,6 @@
         return mf;
     }
 
-    /**
-     * Construct a new 010 registry with the default message factories registered
-     * @return a message factory registry
-     */
-    public static MessageFactoryRegistry newDefault010Registry()
-    {
-        // TODO use 0.10 classes 
-        MessageFactoryRegistry mf = new MessageFactoryRegistry();
-        mf.registerFactory(JMSMapMessage.MIME_TYPE, new JMSMapMessageFactory());
-        mf.registerFactory("text/plain", new JMSTextMessageFactory());
-        mf.registerFactory("text/xml", new JMSTextMessageFactory());
-        mf.registerFactory(JMSBytesMessage.MIME_TYPE, new JMSBytesMessageFactory());
-        mf.registerFactory(JMSObjectMessage.MIME_TYPE, new JMSObjectMessageFactory());
-        mf.registerFactory(JMSStreamMessage.MIME_TYPE, new JMSStreamMessageFactory());
-        mf.registerFactory(null, new JMSBytesMessageFactory());
-
-        return mf;
-    }
 
 
     public void registerFactory(String mimeType, MessageFactory mf)
@@ -96,25 +89,26 @@
     /**
      * Create a message. This looks up the MIME type from the content header and instantiates the appropriate
      * concrete message type.
-     * @param deliveryTag the AMQ message id
-     * @param redelivered true if redelivered
+     *
+     * @param deliveryTag   the AMQ message id
+     * @param redelivered   true if redelivered
      * @param contentHeader the content header that was received
-     * @param bodies a list of ContentBody instances
+     * @param bodies        a list of ContentBody instances
      * @return the message.
      * @throws AMQException
      * @throws JMSException
      */
     public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, AMQShortString exchange,
                                             AMQShortString routingKey, ContentHeaderBody contentHeader, List bodies)
-                                     throws AMQException, JMSException
+            throws AMQException, JMSException
     {
         BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.properties;
 
         // Get the message content type. This may be null for pure AMQP messages, but will always be set for JMS over
         // AMQP. When the type is null, it can only be assumed that the message is a byte message.
         AMQShortString contentTypeShortString = properties.getContentType();
-        contentTypeShortString = (contentTypeShortString == null) ? new AMQShortString(JMSBytesMessage.MIME_TYPE)
-                                                                  : contentTypeShortString;
+        contentTypeShortString = (contentTypeShortString == null) ? new AMQShortString(
+                JMSBytesMessage.MIME_TYPE) : contentTypeShortString;
 
         MessageFactory mf = _mimeShortStringToFactoryMap.get(contentTypeShortString);
         if (mf == null)
@@ -126,6 +120,29 @@
             return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies);
         }
     }
+
+    public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, AMQShortString exchange,
+                                            AMQShortString routingKey, Struct[] contentHeader, List bodies)
+            throws AMQException, JMSException
+    {
+        MessageProperties mprop = (MessageProperties) contentHeader[0];
+        String messageType =  mprop.getContentType();
+        if (messageType == null)
+        {
+            _logger.debug("no message type specified, building a byte message");
+            messageType = JMSBytesMessage.MIME_TYPE;
+        }
+        MessageFactory mf = _mimeShortStringToFactoryMap.get(new AMQShortString(messageType));
+        if (mf == null)
+        {
+            throw new AMQException(null, "Unsupport MIME type of " + messageType, null);
+        }
+        else
+        {
+            return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies);
+        }
+    }
+
 
     public AbstractJMSMessage createMessage(String mimeType) throws AMQException, JMSException
     {