You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/10/21 15:48:19 UTC

[4/5] qpid-jms git commit: QPIDJMS-215 Perform Message encoding at send time

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/657747b7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
index 89094a1..9f90d87 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
@@ -34,20 +34,23 @@ import javax.jms.MessageFormatException;
 
 import org.apache.qpid.jms.JmsDestination;
 import org.apache.qpid.jms.exceptions.IdConversionException;
+import org.apache.qpid.jms.message.JmsMessage;
 import org.apache.qpid.jms.message.facade.JmsMessageFacade;
 import org.apache.qpid.jms.provider.amqp.AmqpConnection;
 import org.apache.qpid.jms.provider.amqp.AmqpConsumer;
-import org.apache.qpid.proton.Proton;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.UnsignedByte;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
 import org.apache.qpid.proton.amqp.messaging.Footer;
 import org.apache.qpid.proton.amqp.messaging.Header;
 import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
 import org.apache.qpid.proton.amqp.messaging.Properties;
-import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.amqp.messaging.Section;
+
+import io.netty.buffer.ByteBuf;
 
 public class AmqpJmsMessageFacade implements JmsMessageFacade {
 
@@ -55,11 +58,15 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
     private static final Charset UTF8 = Charset.forName("UTF-8");
     private static final long UINT_MAX = 0xFFFFFFFFL;
 
-    protected final Message message;
-    protected final AmqpConnection connection;
+    protected AmqpConnection connection;
 
-    private Map<Symbol,Object> messageAnnotationsMap;
-    private Map<String,Object> applicationPropertiesMap;
+    private Properties properties;
+    private Header header;
+    private Section body;
+    private Map<Symbol, Object> messageAnnotationsMap;
+    private Map<String, Object> applicationPropertiesMap;
+    private Map<Symbol, Object> deliveryAnnotationsMap;
+    private Map<Symbol, Object> footerMap;
 
     private JmsDestination replyTo;
     private JmsDestination destination;
@@ -74,42 +81,29 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
     private Long userSpecifiedTTL = null;
 
     /**
-     * Create a new AMQP Message Facade with an empty message instance.
+     * Initialize the state of this message for send.
      *
      * @param connection
-     *        the AmqpConnection that under which this facade was created.
+     *      The connection that this message is linked to.
      */
-    public AmqpJmsMessageFacade(AmqpConnection connection) {
-        this.message = Proton.message();
-        this.message.setDurable(true);
-
+    public void initialize(AmqpConnection connection) {
         this.connection = connection;
-        setMessageAnnotation(JMS_MSG_TYPE, JMS_MESSAGE);
+
+        setMessageAnnotation(JMS_MSG_TYPE, getJmsMsgType());
+        setPersistent(true); // TODO - Remove to avoid default Header
+        initializeEmptyBody();
     }
 
     /**
-     * Creates a new Facade around an incoming AMQP Message for dispatch to the
-     * JMS Consumer instance.
+     * Initialize the state of this message for receive.
      *
      * @param consumer
-     *        the consumer that received this message.
-     * @param message
-     *        the incoming Message instance that is being wrapped.
+     *      The consumer that this message was read from.
      */
-    @SuppressWarnings("unchecked")
-    public AmqpJmsMessageFacade(AmqpConsumer consumer, Message message) {
-        this.message = message;
+    public void initialize(AmqpConsumer consumer) {
         this.connection = consumer.getConnection();
         this.consumerDestination = consumer.getDestination();
 
-        if (message.getMessageAnnotations() != null) {
-            messageAnnotationsMap = message.getMessageAnnotations().getValue();
-        }
-
-        if (message.getApplicationProperties() != null) {
-            applicationPropertiesMap = message.getApplicationProperties().getValue();
-        }
-
         Long ttl = getTtl();
         Long absoluteExpiryTime = getAbsoluteExpiryTime();
         if (absoluteExpiryTime == null && ttl != null) {
@@ -118,6 +112,13 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
     }
 
     /**
+     * Used to indicate that a Message object should empty the body element and make
+     * any other internal updates to reflect the message now has no body value.
+     */
+    protected void initializeEmptyBody() {
+    }
+
+    /**
      * @return the appropriate byte value that indicates the type of message this is.
      */
     public byte getJmsMsgType() {
@@ -132,11 +133,22 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
      * @return a String value indicating the message content type.
      */
     public String getContentType() {
-        return message.getContentType();
+        if (properties != null && properties.getContentType() != null) {
+            return properties.getContentType().toString();
+        }
+
+        return null;
     }
 
     public void setContentType(String value) {
-        message.setContentType(value);
+        if (properties == null) {
+            if (value == null) {
+                return;
+            }
+            lazyCreateProperties();
+        }
+
+        properties.setContentType(Symbol.valueOf(value));
     }
 
     @Override
@@ -212,11 +224,11 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
         }
 
         if (ttl > 0 && ttl < UINT_MAX) {
-            message.setTtl(ttl);
+            lazyCreateHeader();
+            header.setTtl(UnsignedInteger.valueOf(ttl));
         } else {
-            Header hdr = message.getHeader();
-            if (hdr != null) {
-                hdr.setTtl(null);
+            if (header != null) {
+                header.setTtl(null);
             }
         }
 
@@ -229,7 +241,7 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
 
     @Override
     public void clearBody() {
-        message.setBody(null);
+        setBody(null);
     }
 
     @Override
@@ -239,13 +251,14 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
 
     @Override
     public AmqpJmsMessageFacade copy() throws JMSException {
-        AmqpJmsMessageFacade copy = new AmqpJmsMessageFacade(connection);
+        AmqpJmsMessageFacade copy = new AmqpJmsMessageFacade();
         copyInto(copy);
         return copy;
     }
 
-    @SuppressWarnings("unchecked")
     protected void copyInto(AmqpJmsMessageFacade target) {
+        target.connection = connection;
+
         if (consumerDestination != null) {
             target.consumerDestination = consumerDestination;
         }
@@ -266,48 +279,42 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
             target.userSpecifiedTTL = userSpecifiedTTL;
         }
 
-        Message targetMsg = target.getAmqpMessage();
-
-        if (message.getHeader() != null) {
+        if (header != null) {
             Header headers = new Header();
-            headers.setDurable(message.getHeader().getDurable());
-            headers.setPriority(message.getHeader().getPriority());
-            headers.setTtl(message.getHeader().getTtl());
-            headers.setFirstAcquirer(message.getHeader().getFirstAcquirer());
-            headers.setDeliveryCount(message.getHeader().getDeliveryCount());
-            targetMsg.setHeader(headers);
-        }
+            headers.setDurable(header.getDurable());
+            headers.setPriority(header.getPriority());
+            headers.setTtl(header.getTtl());
+            headers.setFirstAcquirer(header.getFirstAcquirer());
+            headers.setDeliveryCount(header.getDeliveryCount());
 
-        if (message.getFooter() != null && message.getFooter().getValue() != null) {
-            Map<Object, Object> newFooterMap = new HashMap<Object, Object>();
-            newFooterMap.putAll(message.getFooter().getValue());
-            targetMsg.setFooter(new Footer(newFooterMap));
+            target.setHeader(headers);
         }
 
-        if (message.getProperties() != null) {
+        if (properties != null) {
             Properties properties = new Properties();
 
-            properties.setMessageId(message.getProperties().getMessageId());
-            properties.setUserId(message.getProperties().getUserId());
-            properties.setTo(message.getProperties().getTo());
-            properties.setSubject(message.getProperties().getSubject());
-            properties.setReplyTo(message.getProperties().getReplyTo());
-            properties.setCorrelationId(message.getProperties().getCorrelationId());
-            properties.setContentType(message.getProperties().getContentType());
-            properties.setContentEncoding(message.getProperties().getContentEncoding());
-            properties.setAbsoluteExpiryTime(message.getProperties().getAbsoluteExpiryTime());
-            properties.setCreationTime(message.getProperties().getCreationTime());
-            properties.setGroupId(message.getProperties().getGroupId());
-            properties.setGroupSequence(message.getProperties().getGroupSequence());
-            properties.setReplyToGroupId(message.getProperties().getReplyToGroupId());
-
-            targetMsg.setProperties(properties);
+            properties.setMessageId(getProperties().getMessageId());
+            properties.setUserId(getProperties().getUserId());
+            properties.setTo(getProperties().getTo());
+            properties.setSubject(getProperties().getSubject());
+            properties.setReplyTo(getProperties().getReplyTo());
+            properties.setCorrelationId(getProperties().getCorrelationId());
+            properties.setContentType(getProperties().getContentType());
+            properties.setContentEncoding(getProperties().getContentEncoding());
+            properties.setAbsoluteExpiryTime(getProperties().getAbsoluteExpiryTime());
+            properties.setCreationTime(getProperties().getCreationTime());
+            properties.setGroupId(getProperties().getGroupId());
+            properties.setGroupSequence(getProperties().getGroupSequence());
+            properties.setReplyToGroupId(getProperties().getReplyToGroupId());
+
+            target.setProperties(properties);
         }
 
-        if (message.getDeliveryAnnotations() != null && message.getDeliveryAnnotations().getValue() != null) {
-            Map<Symbol, Object> newDeliveryAnnotations = new HashMap<Symbol, Object>();
-            newDeliveryAnnotations.putAll(message.getDeliveryAnnotations().getValue());
-            targetMsg.setFooter(new Footer(newDeliveryAnnotations));
+        target.setBody(body);
+
+        if (deliveryAnnotationsMap != null) {
+            target.lazyCreateDeliveryAnnotations();
+            target.deliveryAnnotationsMap.putAll(deliveryAnnotationsMap);
         }
 
         if (applicationPropertiesMap != null) {
@@ -319,33 +326,61 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
             target.lazyCreateMessageAnnotations();
             target.messageAnnotationsMap.putAll(messageAnnotationsMap);
         }
+
+        if (footerMap != null) {
+            target.lazyCreateFooter();
+            target.footerMap.putAll(footerMap);
+        }
     }
 
     @Override
     public String getMessageId() {
-        Object underlying = message.getMessageId();
+        Object underlying = null;
+
+        if (properties != null) {
+            underlying = properties.getMessageId();
+        }
+
         return AmqpMessageIdHelper.INSTANCE.toMessageIdString(underlying);
     }
 
     @Override
     public Object getProviderMessageIdObject() {
-        return message.getMessageId();
+        return properties == null ? null : properties.getMessageId();
     }
 
     @Override
     public void setProviderMessageIdObject(Object messageId) {
-        message.setMessageId(messageId);
+        if (properties == null) {
+            if (messageId == null) {
+                return;
+            }
+
+            lazyCreateProperties();
+        }
+
+        properties.setMessageId(messageId);
     }
 
     @Override
     public void setMessageId(String messageId) throws IdConversionException {
-        message.setMessageId(AmqpMessageIdHelper.INSTANCE.toIdObject(messageId));
+        Object value = AmqpMessageIdHelper.INSTANCE.toIdObject(messageId);
+
+        if (properties == null) {
+            if (value == null) {
+                return;
+            }
+
+            lazyCreateProperties();
+        }
+
+        properties.setMessageId(value);
     }
 
     @Override
     public long getTimestamp() {
-        if (message.getProperties() != null) {
-            Date timestamp = message.getProperties().getCreationTime();
+        if (properties != null) {
+            Date timestamp = properties.getCreationTime();
             if (timestamp != null) {
                 return timestamp.getTime();
             }
@@ -356,39 +391,62 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
 
     @Override
     public void setTimestamp(long timestamp) {
-        if (timestamp != 0) {
-            message.setCreationTime(timestamp);
-        } else {
-            if (message.getProperties() != null) {
-                message.getProperties().setCreationTime(null);
+        if (properties == null) {
+            if (timestamp == 0) {
+                return;
             }
+
+            lazyCreateProperties();
+        }
+
+        if (timestamp == 0) {
+            properties.setCreationTime(null);
+        } else {
+            properties.setCreationTime(new Date(timestamp));
         }
     }
 
     @Override
     public String getCorrelationId() {
-        return AmqpMessageIdHelper.INSTANCE.toCorrelationIdString(message.getCorrelationId());
+        if (properties == null) {
+            return null;
+        }
+
+        return AmqpMessageIdHelper.INSTANCE.toCorrelationIdString(properties.getCorrelationId());
     }
 
     @Override
     public void setCorrelationId(String correlationId) throws IdConversionException {
-        if (correlationId == null) {
-            message.setCorrelationId(null);
-        } else {
+        Object idObject = null;
+
+        if (correlationId != null) {
             if (AmqpMessageIdHelper.INSTANCE.hasMessageIdPrefix(correlationId)) {
                 // JMSMessageID value, process it for possible type conversion
-                Object idObject = AmqpMessageIdHelper.INSTANCE.toIdObject(correlationId);
-                message.setCorrelationId(idObject);
+                idObject = AmqpMessageIdHelper.INSTANCE.toIdObject(correlationId);
             } else {
-                // application-specific value, send as-is
-                message.setCorrelationId(correlationId);
+                idObject = correlationId;
             }
         }
+
+        if (properties == null) {
+            if (idObject == null) {
+                return;
+            }
+
+            lazyCreateProperties();
+        }
+
+        properties.setCorrelationId(idObject);
     }
 
     @Override
     public byte[] getCorrelationIdBytes() throws JMSException {
-        Object correlationId = message.getCorrelationId();
+        Object correlationId = null;
+
+        if (properties != null) {
+            correlationId = properties.getCorrelationId();
+        }
+
         if (correlationId == null) {
             return null;
         } else if (correlationId instanceof Binary) {
@@ -412,17 +470,37 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
             binaryIdValue = new Binary(Arrays.copyOf(correlationId, correlationId.length));
         }
 
-        message.setCorrelationId(binaryIdValue);
+        if (properties == null) {
+            if (binaryIdValue == null) {
+                return;
+            }
+
+            lazyCreateProperties();
+        }
+
+        properties.setCorrelationId(binaryIdValue);
     }
 
     @Override
     public boolean isPersistent() {
-        return message.isDurable();
+        if (header != null && header.getDurable() != null) {
+            return header.getDurable();
+        }
+
+        return false;
     }
 
     @Override
     public void setPersistent(boolean value) {
-        this.message.setDurable(value);
+        if (header == null) {
+            if (value == false) {
+                return;
+            } else {
+                lazyCreateHeader();
+            }
+        }
+
+        header.setDurable(value);
     }
 
     @Override
@@ -437,8 +515,8 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
 
     @Override
     public int getRedeliveryCount() {
-        if (message.getHeader() != null) {
-            UnsignedInteger count = message.getHeader().getDeliveryCount();
+        if (header != null) {
+            UnsignedInteger count = header.getDeliveryCount();
             if (count != null) {
                 return count.intValue();
             }
@@ -450,11 +528,12 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
     @Override
     public void setRedeliveryCount(int redeliveryCount) {
         if (redeliveryCount == 0) {
-            if (message.getHeader() != null) {
-                message.getHeader().setDeliveryCount(null);
+            if (header != null) {
+                header.setDeliveryCount(null);
             }
         } else {
-            message.setDeliveryCount(redeliveryCount);
+            lazyCreateHeader();
+            header.setDeliveryCount(UnsignedInteger.valueOf(redeliveryCount));
         }
     }
 
@@ -478,24 +557,29 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
 
     @Override
     public String getType() {
-        return message.getSubject();
+        if (properties != null) {
+            return properties.getSubject();
+        }
+
+        return null;
     }
 
     @Override
     public void setType(String type) {
         if (type != null) {
-            message.setSubject(type);
+            lazyCreateProperties();
+            properties.setSubject(type);
         } else {
-            if (message.getProperties() != null) {
-                message.getProperties().setSubject(null);
+            if (properties != null) {
+                properties.setSubject(null);
             }
         }
     }
 
     @Override
     public int getPriority() {
-        if (message.getHeader() != null) {
-            UnsignedByte priority = message.getHeader().getPriority();
+        if (header != null) {
+            UnsignedByte priority = header.getPriority();
             if (priority != null) {
                 int scaled = priority.intValue();
                 if (scaled > 9) {
@@ -512,10 +596,8 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
     @Override
     public void setPriority(int priority) {
         if (priority == DEFAULT_PRIORITY) {
-            if (message.getHeader() == null) {
-                return;
-            } else {
-                message.getHeader().setPriority(null);
+            if (header != null) {
+                header.setPriority(null);
             }
         } else {
             byte scaled = (byte) priority;
@@ -525,7 +607,8 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
                 scaled = 9;
             }
 
-            message.setPriority(scaled);
+            lazyCreateHeader();
+            header.setPriority(UnsignedByte.valueOf(scaled));
         }
     }
 
@@ -636,20 +719,33 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
     }
 
     public void setReplyToGroupId(String replyToGroupId) {
-        message.setReplyToGroupId(replyToGroupId);
+        if (replyToGroupId != null) {
+            lazyCreateProperties();
+            properties.setReplyToGroupId(replyToGroupId);
+        } else {
+            if (properties != null) {
+                properties.setReplyToGroupId(null);
+            }
+        }
     }
 
     public String getReplyToGroupId() {
-        return message.getReplyToGroupId();
+        if (properties != null) {
+            return properties.getReplyToGroupId();
+        }
+
+        return null;
     }
 
     @Override
     public String getUserId() {
         String userId = null;
-        byte[] userIdBytes = message.getUserId();
 
-        if (userIdBytes != null) {
-            userId = new String(userIdBytes, UTF8);
+        if (properties != null && properties.getUserId() != null) {
+            Binary userIdBytes = properties.getUserId();
+            if (userIdBytes.getLength() != 0) {
+                userId = new String(userIdBytes.getArray(), userIdBytes.getArrayOffset(), userIdBytes.getLength(), UTF8);
+            }
         }
 
         return userId;
@@ -663,44 +759,66 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
         }
 
         if (bytes == null) {
-            if (message.getProperties() != null) {
-                message.getProperties().setUserId(null);
+            if (properties != null) {
+                properties.setUserId(null);
             }
         } else {
-            message.setUserId(bytes);
+            lazyCreateProperties();
+            properties.setUserId(new Binary(bytes));
         }
     }
 
     @Override
     public byte[] getUserIdBytes() {
-        return message.getUserId();
+        if(properties == null || properties.getUserId() == null) {
+            return null;
+        } else {
+            final Binary userId = properties.getUserId();
+            byte[] id = new byte[userId.getLength()];
+            System.arraycopy(userId.getArray(), userId.getArrayOffset(), id, 0, userId.getLength());
+            return id;
+        }
     }
 
     @Override
     public void setUserIdBytes(byte[] userId) {
         if (userId == null || userId.length == 0) {
-            if (message.getProperties() != null) {
-                message.getProperties().setUserId(null);
+            if (properties != null) {
+                properties.setUserId(null);
             }
         } else {
-            message.setUserId(userId);
+            lazyCreateProperties();
+            byte[] id = new byte[userId.length];
+            System.arraycopy(userId, 0, id, 0, userId.length);
+            properties.setUserId(new Binary(id));
         }
     }
 
     @Override
     public String getGroupId() {
-        return message.getGroupId();
+        if (properties != null) {
+            return properties.getGroupId();
+        }
+
+        return null;
     }
 
     @Override
     public void setGroupId(String groupId) {
-        message.setGroupId(groupId);
+        if (groupId != null) {
+            lazyCreateProperties();
+            properties.setGroupId(groupId);
+        } else {
+            if (properties != null) {
+                properties.setGroupId(null);
+            }
+        }
     }
 
     @Override
     public int getGroupSequence() {
-        if (message.getProperties() != null) {
-            UnsignedInteger groupSeqUint = message.getProperties().getGroupSequence();
+        if (properties != null) {
+            UnsignedInteger groupSeqUint = properties.getGroupSequence();
             if (groupSeqUint != null) {
                 // This wraps it into the negative int range if uint is over 2^31-1
                 return groupSeqUint.intValue();
@@ -713,32 +831,26 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
     @Override
     public void setGroupSequence(int groupSequence) {
         // This wraps it into the upper uint range if a negative was provided
-        if (groupSequence == 0) {
-            if (message.getProperties() != null) {
-                message.getProperties().setGroupSequence(null);
-            }
+        if (groupSequence != 0) {
+            lazyCreateProperties();
+            properties.setGroupSequence(UnsignedInteger.valueOf(groupSequence));
         } else {
-            message.setGroupSequence(groupSequence);
+            if (properties != null) {
+                properties.setGroupSequence(null);
+            }
         }
     }
 
     @Override
     public boolean hasBody() {
-        return message.getBody() == null;
-    }
-
-    /**
-     * @return the true AMQP Message instance wrapped by this Facade.
-     */
-    public Message getAmqpMessage() {
-        return this.message;
+        return body == null;
     }
 
     /**
      * The AmqpConnection instance that is associated with this Message.
      * @return the connection
      */
-    public AmqpConnection getConnection() {
+    AmqpConnection getConnection() {
         return connection;
     }
 
@@ -813,7 +925,6 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
      */
     void clearMessageAnnotations() {
         messageAnnotationsMap = null;
-        message.setMessageAnnotations(null);
     }
 
     /**
@@ -821,33 +932,151 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
      */
     void clearAllApplicationProperties() {
         applicationPropertiesMap = null;
-        message.setApplicationProperties(null);
     }
 
     String getToAddress() {
-        return message.getAddress();
+        if (properties != null) {
+            return properties.getTo();
+        }
+
+        return null;
     }
 
     void setToAddress(String address) {
-        message.setAddress(address);
+        if (address != null) {
+            lazyCreateProperties();
+            properties.setTo(address);
+        } else {
+            if (properties != null) {
+                properties.setTo(null);
+            }
+        }
     }
 
     String getReplyToAddress() {
-        return message.getReplyTo();
+        if (properties != null) {
+            return properties.getReplyTo();
+        }
+
+        return null;
     }
 
     void setReplyToAddress(String address) {
-        this.message.setReplyTo(address);
+        if (address != null) {
+            lazyCreateProperties();
+            properties.setReplyTo(address);
+        } else {
+            if (properties != null) {
+                properties.setReplyTo(null);
+            }
+        }
     }
 
     JmsDestination getConsumerDestination() {
         return this.consumerDestination;
     }
 
+    public JmsMessage asJmsMessage() {
+        return new JmsMessage(this);
+    }
+
+    @Override
+    public ByteBuf encodeMessage() {
+        return AmqpCodec.encodeMessage(this);
+    }
+
+    //----- Access to AMQP Message Values ------------------------------------//
+
+    Header getHeader() {
+        return header;
+    }
+
+    void setHeader(Header header) {
+        this.header = header;
+    }
+
+    Properties getProperties() {
+        return properties;
+    }
+
+    void setProperties(Properties properties) {
+        this.properties = properties;
+    }
+
+    Section getBody() {
+        return body;
+    }
+
+    void setBody(Section body) {
+        this.body = body;
+    }
+
+    MessageAnnotations getMessageAnnotations() {
+        MessageAnnotations result = null;
+        if (messageAnnotationsMap != null && !messageAnnotationsMap.isEmpty()) {
+            result = new MessageAnnotations(messageAnnotationsMap);
+        }
+
+        return result;
+    }
+
+    void setMessageAnnotations(MessageAnnotations messageAnnotations) {
+        if (messageAnnotations != null) {
+            this.messageAnnotationsMap = messageAnnotations.getValue();
+        }
+    }
+
+    DeliveryAnnotations getDeliveryAnnotations() {
+        DeliveryAnnotations result = null;
+        if (deliveryAnnotationsMap != null && !deliveryAnnotationsMap.isEmpty()) {
+            result = new DeliveryAnnotations(deliveryAnnotationsMap);
+        }
+
+        return result;
+    }
+
+    void setDeliveryAnnotations(DeliveryAnnotations deliveryAnnotations) {
+        if (deliveryAnnotations != null) {
+            this.deliveryAnnotationsMap = deliveryAnnotations.getValue();
+        }
+    }
+
+    ApplicationProperties getApplicationProperties() {
+        ApplicationProperties result = null;
+        if (applicationPropertiesMap != null && !applicationPropertiesMap.isEmpty()) {
+            result = new ApplicationProperties(applicationPropertiesMap);
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    void setApplicationProperties(ApplicationProperties applicationProperties) {
+        if (applicationProperties != null) {
+            this.applicationPropertiesMap = applicationProperties.getValue();
+        }
+    }
+
+    Footer getFooter() {
+        Footer result = null;
+        if (footerMap != null && footerMap.isEmpty()) {
+            result = new Footer(footerMap);
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    void setFooter(Footer footer) {
+        if (footer != null) {
+            this.footerMap = footer.getValue();
+        }
+    }
+
+    //----- Internal Message Utility Methods ---------------------------------//
+
     private Long getAbsoluteExpiryTime() {
         Long result = null;
-        if (message.getProperties() != null) {
-            Date date = message.getProperties().getAbsoluteExpiryTime();
+        if (properties != null) {
+            Date date = properties.getAbsoluteExpiryTime();
             if (date != null) {
                 result = date.getTime();
             }
@@ -858,8 +1087,8 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
 
     private Long getTtl() {
         Long result = null;
-        if (message.getHeader() != null) {
-            UnsignedInteger ttl = message.getHeader().getTtl();
+        if (header != null) {
+            UnsignedInteger ttl = header.getTtl();
             if (ttl != null) {
                 result = ttl.longValue();
             }
@@ -869,26 +1098,49 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
     }
 
     private void setAbsoluteExpiryTime(Long expiration) {
-        if (expiration == null) {
-            if (message.getProperties() != null) {
-                message.getProperties().setAbsoluteExpiryTime(null);
+        if (expiration == null || expiration == 0l) {
+            if (properties != null) {
+                properties.setAbsoluteExpiryTime(null);
             }
         } else {
-            message.setExpiryTime(expiration);
+            lazyCreateProperties();
+            properties.setAbsoluteExpiryTime(new Date(expiration));
+        }
+    }
+
+    private void lazyCreateProperties() {
+        if (properties == null) {
+            properties = new Properties();
+        }
+    }
+
+    private void lazyCreateHeader() {
+        if (header == null) {
+            header = new Header();
         }
     }
 
     private void lazyCreateMessageAnnotations() {
         if (messageAnnotationsMap == null) {
-            messageAnnotationsMap = new HashMap<Symbol,Object>();
-            message.setMessageAnnotations(new MessageAnnotations(messageAnnotationsMap));
+            messageAnnotationsMap = new HashMap<Symbol, Object>();
+        }
+    }
+
+    private void lazyCreateDeliveryAnnotations() {
+        if (deliveryAnnotationsMap == null) {
+            deliveryAnnotationsMap = new HashMap<Symbol, Object>();
         }
     }
 
     private void lazyCreateApplicationProperties() {
         if (applicationPropertiesMap == null) {
             applicationPropertiesMap = new HashMap<String, Object>();
-            message.setApplicationProperties(new ApplicationProperties(applicationPropertiesMap));
+        }
+    }
+
+    private void lazyCreateFooter() {
+        if (footerMap == null) {
+            footerMap = new HashMap<Symbol, Object>();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/657747b7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFactory.java
index 5b78556..25e1489 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFactory.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFactory.java
@@ -29,8 +29,6 @@ import org.apache.qpid.jms.message.JmsMessageFactory;
 import org.apache.qpid.jms.message.JmsObjectMessage;
 import org.apache.qpid.jms.message.JmsStreamMessage;
 import org.apache.qpid.jms.message.JmsTextMessage;
-import org.apache.qpid.jms.message.facade.JmsObjectMessageFacade;
-import org.apache.qpid.jms.message.facade.JmsTextMessageFacade;
 import org.apache.qpid.jms.provider.amqp.AmqpConnection;
 
 /**
@@ -53,7 +51,9 @@ public class AmqpJmsMessageFactory implements JmsMessageFactory {
 
     @Override
     public JmsMessage createMessage() throws JMSException {
-        return new JmsMessage(new AmqpJmsMessageFacade(connection));
+        AmqpJmsMessageFacade facade = new AmqpJmsMessageFacade();
+        facade.initialize(connection);
+        return facade.asJmsMessage();
     }
 
     @Override
@@ -63,29 +63,35 @@ public class AmqpJmsMessageFactory implements JmsMessageFactory {
 
     @Override
     public JmsTextMessage createTextMessage(String payload) throws JMSException {
-
-        JmsTextMessageFacade facade = new AmqpJmsTextMessageFacade(connection);
+        AmqpJmsTextMessageFacade facade = new AmqpJmsTextMessageFacade();
+        facade.initialize(connection);
 
         if (payload != null) {
             facade.setText(payload);
         }
 
-        return new JmsTextMessage(facade);
+        return facade.asJmsMessage();
     }
 
     @Override
     public JmsBytesMessage createBytesMessage() throws JMSException {
-        return new JmsBytesMessage(new AmqpJmsBytesMessageFacade(connection));
+        AmqpJmsBytesMessageFacade facade = new AmqpJmsBytesMessageFacade();
+        facade.initialize(connection);
+        return facade.asJmsMessage();
     }
 
     @Override
     public JmsMapMessage createMapMessage() throws JMSException {
-        return new JmsMapMessage(new AmqpJmsMapMessageFacade(connection));
+        AmqpJmsMapMessageFacade facade = new AmqpJmsMapMessageFacade();
+        facade.initialize(connection);
+        return facade.asJmsMessage();
     }
 
     @Override
     public JmsStreamMessage createStreamMessage() throws JMSException {
-        return new JmsStreamMessage(new AmqpJmsStreamMessageFacade(connection));
+        AmqpJmsStreamMessageFacade facade = new AmqpJmsStreamMessageFacade();
+        facade.initialize(connection);
+        return facade.asJmsMessage();
     }
 
     @Override
@@ -95,9 +101,9 @@ public class AmqpJmsMessageFactory implements JmsMessageFactory {
 
     @Override
     public JmsObjectMessage createObjectMessage(Serializable payload) throws JMSException {
-        JmsObjectMessageFacade facade = new AmqpJmsObjectMessageFacade(
-            connection, connection.isObjectMessageUsesAmqpTypes());
+        AmqpJmsObjectMessageFacade facade = new AmqpJmsObjectMessageFacade();
 
+        facade.initialize(connection);
         if (payload != null) {
             try {
                 facade.setObject(payload);
@@ -106,6 +112,6 @@ public class AmqpJmsMessageFactory implements JmsMessageFactory {
             }
         }
 
-        return new JmsObjectMessage(facade);
+        return facade.asJmsMessage();
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/657747b7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java
index f4b541f..e0f026f 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java
@@ -16,7 +16,6 @@
  */
 package org.apache.qpid.jms.provider.amqp.message;
 
-import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_MSG_TYPE;
 import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_OBJECT_MESSAGE;
 
 import java.io.IOException;
@@ -25,13 +24,11 @@ import java.io.Serializable;
 import javax.jms.JMSException;
 
 import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
+import org.apache.qpid.jms.message.JmsObjectMessage;
 import org.apache.qpid.jms.message.facade.JmsObjectMessageFacade;
 import org.apache.qpid.jms.policy.JmsDeserializationPolicy;
 import org.apache.qpid.jms.provider.amqp.AmqpConnection;
 import org.apache.qpid.jms.provider.amqp.AmqpConsumer;
-import org.apache.qpid.proton.message.Message;
-
-import io.netty.buffer.ByteBuf;
 
 /**
  * Wrapper around an AMQP Message instance that will be treated as a JMS ObjectMessage
@@ -40,46 +37,20 @@ import io.netty.buffer.ByteBuf;
 public class AmqpJmsObjectMessageFacade extends AmqpJmsMessageFacade implements JmsObjectMessageFacade {
 
     private AmqpObjectTypeDelegate delegate;
+    private JmsDeserializationPolicy deserializationPolicy;
 
-    private final JmsDeserializationPolicy deserializationPolicy;
-
-    /**
-     * Creates a new facade instance for outgoing message
-     *
-     * @param connection
-     *        the AmqpConnection that under which this facade was created.
-     * @param isAmqpTypeEncoded
-     *        controls the type used to encode the body.
-     */
-    public AmqpJmsObjectMessageFacade(AmqpConnection connection, boolean isAmqpTypeEncoded) {
-        this(connection, isAmqpTypeEncoded, null);
-    }
-
-    private AmqpJmsObjectMessageFacade(AmqpConnection connection, boolean isAmqpTypeEncoded, JmsDeserializationPolicy deserializationPolicy) {
-        super(connection);
-        this.deserializationPolicy = deserializationPolicy;
-
-        setMessageAnnotation(JMS_MSG_TYPE, JMS_OBJECT_MESSAGE);
-        initDelegate(isAmqpTypeEncoded, null);
+    @Override
+    public void initialize(AmqpConnection connection) {
+        super.initialize(connection);
+        initDelegate(connection.isObjectMessageUsesAmqpTypes());
     }
 
-    /**
-     * Creates a new Facade around an incoming AMQP Message for dispatch to the
-     * JMS Consumer instance.
-     *
-     * @param consumer
-     *        the consumer that received this message.
-     * @param message
-     *        the incoming Message instance that is being wrapped.
-     * @param messageBytes
-     *        a copy of the raw bytes of the incoming message.
-     */
-    public AmqpJmsObjectMessageFacade(AmqpConsumer consumer, Message message, ByteBuf messageBytes) {
-        super(consumer, message);
+    @Override
+    public void initialize(AmqpConsumer consumer) {
+        super.initialize(consumer);
         deserializationPolicy = consumer.getResourceInfo().getDeserializationPolicy();
-
-        boolean javaSerialized = AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.equals(message.getContentType());
-        initDelegate(!javaSerialized, messageBytes);
+        boolean javaSerialized = AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.equals(getContentType());
+        initDelegate(!javaSerialized);
     }
 
     /**
@@ -96,7 +67,9 @@ public class AmqpJmsObjectMessageFacade extends AmqpJmsMessageFacade implements
 
     @Override
     public AmqpJmsObjectMessageFacade copy() throws JMSException {
-        AmqpJmsObjectMessageFacade copy = new AmqpJmsObjectMessageFacade(connection, isAmqpTypedEncoding(), deserializationPolicy);
+        AmqpJmsObjectMessageFacade copy = new AmqpJmsObjectMessageFacade();
+        copy.deserializationPolicy = deserializationPolicy;
+        copy.initDelegate(isAmqpTypedEncoding());
         copyInto(copy);
         try {
             delegate.copyInto(copy.delegate);
@@ -135,6 +108,11 @@ public class AmqpJmsObjectMessageFacade extends AmqpJmsMessageFacade implements
         delegate.onSend();
     }
 
+    @Override
+    public JmsObjectMessage asJmsMessage() {
+        return new JmsObjectMessage(this);
+    }
+
     void setUseAmqpTypedEncoding(boolean useAmqpTypedEncoding) throws JMSException {
         if (useAmqpTypedEncoding != delegate.isAmqpTypeEncoded()) {
             try {
@@ -142,9 +120,9 @@ public class AmqpJmsObjectMessageFacade extends AmqpJmsMessageFacade implements
 
                 AmqpObjectTypeDelegate newDelegate = null;
                 if (useAmqpTypedEncoding) {
-                    newDelegate = new AmqpTypedObjectDelegate(this, null);
+                    newDelegate = new AmqpTypedObjectDelegate(this);
                 } else {
-                    newDelegate = new AmqpSerializedObjectDelegate(this, null, deserializationPolicy);
+                    newDelegate = new AmqpSerializedObjectDelegate(this, deserializationPolicy);
                 }
 
                 newDelegate.setObject(existingObject);
@@ -156,11 +134,11 @@ public class AmqpJmsObjectMessageFacade extends AmqpJmsMessageFacade implements
         }
     }
 
-    private void initDelegate(boolean useAmqpTypes, ByteBuf messageBytes) {
+    private void initDelegate(boolean useAmqpTypes) {
         if (!useAmqpTypes) {
-            delegate = new AmqpSerializedObjectDelegate(this, messageBytes, deserializationPolicy);
+            delegate = new AmqpSerializedObjectDelegate(this, deserializationPolicy);
         } else {
-            delegate = new AmqpTypedObjectDelegate(this, messageBytes);
+            delegate = new AmqpTypedObjectDelegate(this);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/657747b7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java
index 64f1fbd..4843065 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java
@@ -16,7 +16,6 @@
  */
 package org.apache.qpid.jms.provider.amqp.message;
 
-import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_MSG_TYPE;
 import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_STREAM_MESSAGE;
 
 import java.util.ArrayList;
@@ -25,14 +24,12 @@ import java.util.List;
 
 import javax.jms.MessageEOFException;
 
+import org.apache.qpid.jms.message.JmsStreamMessage;
 import org.apache.qpid.jms.message.facade.JmsStreamMessageFacade;
-import org.apache.qpid.jms.provider.amqp.AmqpConnection;
-import org.apache.qpid.jms.provider.amqp.AmqpConsumer;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.messaging.Section;
-import org.apache.qpid.proton.message.Message;
 
 /**
  * Wrapper around an AMQP Message instance that will be treated as a JMS StreamMessage
@@ -43,61 +40,11 @@ public class AmqpJmsStreamMessageFacade extends AmqpJmsMessageFacade implements
     private List<Object> list;
     private int position = 0;
 
-    /**
-     * Create a new facade ready for sending.
-     *
-     * @param connection
-     *        the AmqpConnection that under which this facade was created.
-     */
-    public AmqpJmsStreamMessageFacade(AmqpConnection connection) {
-        super(connection);
-        list = initializeEmptyBodyList(true);
-        setMessageAnnotation(JMS_MSG_TYPE, JMS_STREAM_MESSAGE);
-    }
-
-    /**
-     * Creates a new Facade around an incoming AMQP Message for dispatch to the
-     * JMS Consumer instance.
-     *
-     * @param consumer
-     *        the consumer that received this message.
-     * @param message
-     *        the incoming Message instance that is being wrapped.
-     */
-    @SuppressWarnings("unchecked")
-    public AmqpJmsStreamMessageFacade(AmqpConsumer consumer, Message message) {
-        super(consumer, message);
-
-        Section body = getAmqpMessage().getBody();
-        if (body == null) {
-            list = initializeEmptyBodyList(true);
-        } else if (body instanceof AmqpValue) {
-            Object value = ((AmqpValue) body).getValue();
-
-            if (value == null) {
-                list = initializeEmptyBodyList(false);
-            } else if (value instanceof List) {
-                list = (List<Object>) value;
-            } else {
-                throw new IllegalStateException("Unexpected amqp-value body content type: " + value.getClass().getSimpleName());
-            }
-        } else if (body instanceof AmqpSequence) {
-            List<?> value = ((AmqpSequence) body).getValue();
-
-            if (value == null) {
-                list = initializeEmptyBodyList(true);
-            } else {
-                list = (List<Object>) value;
-            }
-        } else {
-            throw new IllegalStateException("Unexpected message body type: " + body.getClass().getSimpleName());
-        }
-    }
-
     @Override
     public AmqpJmsStreamMessageFacade copy() {
-        AmqpJmsStreamMessageFacade copy = new AmqpJmsStreamMessageFacade(connection);
+        AmqpJmsStreamMessageFacade copy = new AmqpJmsStreamMessageFacade();
         copyInto(copy);
+        copy.initializeEmptyBodyList(getBody() instanceof AmqpSequence);
         copy.list.addAll(list);
         return copy;
     }
@@ -166,13 +113,53 @@ public class AmqpJmsStreamMessageFacade extends AmqpJmsMessageFacade implements
         return !list.isEmpty();
     }
 
+    @Override
+    public JmsStreamMessage asJmsMessage() {
+        return new JmsStreamMessage(this);
+    }
+
+    @Override
+    protected void initializeEmptyBody() {
+        list = initializeEmptyBodyList(true);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    void setBody(Section body) {
+        if (body == null) {
+            list = initializeEmptyBodyList(true);
+        } else if (body instanceof AmqpValue) {
+            Object value = ((AmqpValue) body).getValue();
+
+            if (value == null) {
+                list = initializeEmptyBodyList(false);
+            } else if (value instanceof List) {
+                list = (List<Object>) value;
+                super.setBody(body);
+            } else {
+                throw new IllegalStateException("Unexpected amqp-value body content type: " + value.getClass().getSimpleName());
+            }
+        } else if (body instanceof AmqpSequence) {
+            List<?> value = ((AmqpSequence) body).getValue();
+
+            if (value == null) {
+                list = initializeEmptyBodyList(true);
+            } else {
+                list = (List<Object>) value;
+                super.setBody(body);
+            }
+        } else {
+            throw new IllegalStateException("Unexpected message body type: " + body.getClass().getSimpleName());
+        }
+    }
+
     private List<Object> initializeEmptyBodyList(boolean useSequenceBody) {
         List<Object> emptyList = new ArrayList<Object>();
 
         if (useSequenceBody) {
-            message.setBody(new AmqpSequence(emptyList));
+            setBody(new AmqpSequence(emptyList));
         } else {
-            message.setBody(new AmqpValue(emptyList));
+            setBody(new AmqpValue(emptyList));
         }
 
         return emptyList;

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/657747b7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java
index 44ed9e6..5595362 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java
@@ -16,7 +16,6 @@
  */
 package org.apache.qpid.jms.provider.amqp.message;
 
-import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_MSG_TYPE;
 import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_TEXT_MESSAGE;
 
 import java.nio.ByteBuffer;
@@ -28,14 +27,12 @@ import java.nio.charset.StandardCharsets;
 import javax.jms.JMSException;
 
 import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
+import org.apache.qpid.jms.message.JmsTextMessage;
 import org.apache.qpid.jms.message.facade.JmsTextMessageFacade;
-import org.apache.qpid.jms.provider.amqp.AmqpConnection;
-import org.apache.qpid.jms.provider.amqp.AmqpConsumer;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.messaging.Data;
 import org.apache.qpid.proton.amqp.messaging.Section;
-import org.apache.qpid.proton.message.Message;
 
 /**
  * Wrapper around an AMQP Message instance that will be treated as a JMS TextMessage
@@ -45,32 +42,11 @@ public class AmqpJmsTextMessageFacade extends AmqpJmsMessageFacade implements Jm
 
     private final Charset charset;
 
-    /**
-     * Create a new AMQP Message facade ready for sending.
-     *
-     * @param connection
-     *        the AmqpConnection that under which this facade was created.
-     */
-    public AmqpJmsTextMessageFacade(AmqpConnection connection) {
-        super(connection);
-        setMessageAnnotation(JMS_MSG_TYPE, JMS_TEXT_MESSAGE);
-        setText(null);
-        charset = StandardCharsets.UTF_8;
+    public AmqpJmsTextMessageFacade() {
+        this(StandardCharsets.UTF_8);
     }
 
-    /**
-     * Creates a new Facade around an incoming AMQP Message for dispatch to the
-     * JMS Consumer instance.
-     *
-     * @param consumer
-     *        the consumer that received this message.
-     * @param message
-     *        the incoming Message instance that is being wrapped.
-     * @param charset
-     *        the character set to use when decoding the text when the body is a Data section
-     */
-    public AmqpJmsTextMessageFacade(AmqpConsumer consumer, Message message, Charset charset) {
-        super(consumer, message);
+    AmqpJmsTextMessageFacade(Charset charset) {
         this.charset = charset;
     }
 
@@ -84,7 +60,7 @@ public class AmqpJmsTextMessageFacade extends AmqpJmsMessageFacade implements Jm
 
     @Override
     public AmqpJmsTextMessageFacade copy() throws JMSException {
-        AmqpJmsTextMessageFacade copy = new AmqpJmsTextMessageFacade(connection);
+        AmqpJmsTextMessageFacade copy = new AmqpJmsTextMessageFacade();
         copyInto(copy);
         copy.setText(getText());
         return copy;
@@ -92,7 +68,7 @@ public class AmqpJmsTextMessageFacade extends AmqpJmsMessageFacade implements Jm
 
     @Override
     public String getText() throws JMSException {
-        Section body = getAmqpMessage().getBody();
+        Section body = getBody();
 
         if (body == null) {
             return null;
@@ -126,13 +102,12 @@ public class AmqpJmsTextMessageFacade extends AmqpJmsMessageFacade implements Jm
 
     @Override
     public void setText(String value) {
-        AmqpValue body = new AmqpValue(value);
-        getAmqpMessage().setBody(body);
+        setBody(new AmqpValue(value));
     }
 
     @Override
     public void clearBody() {
-        setText(null);
+        setBody(new AmqpValue(null));
     }
 
     @Override
@@ -144,7 +119,17 @@ public class AmqpJmsTextMessageFacade extends AmqpJmsMessageFacade implements Jm
         }
     }
 
+    @Override
+    public JmsTextMessage asJmsMessage() {
+        return new JmsTextMessage(this);
+    }
+
     Charset getCharset() {
         return charset;
     }
+
+    @Override
+    protected void initializeEmptyBody() {
+        setBody(new AmqpValue(null));
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/657747b7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
index 40987e1..702870b 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
@@ -19,6 +19,7 @@ package org.apache.qpid.jms.provider.amqp.message;
 import java.util.Map;
 
 import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
 import org.apache.qpid.proton.message.Message;
 
 import io.netty.buffer.ByteBuf;
@@ -117,14 +118,14 @@ public final class AmqpMessageSupport {
      *
      * @param key
      *        the String key to use to lookup an annotation.
-     * @param message
-     *        the AMQP message object that is being examined.
+     * @param messageAnnotations
+     *        the AMQP message annotations object that is being examined.
      *
      * @return the given annotation value or null if not present in the message.
      */
-    public static Object getMessageAnnotation(String key, Message message) {
-        if (message != null && message.getMessageAnnotations() != null) {
-            Map<Symbol, Object> annotations = message.getMessageAnnotations().getValue();
+    public static Object getMessageAnnotation(String key, MessageAnnotations messageAnnotations) {
+        if (messageAnnotations != null && messageAnnotations.getValue() != null) {
+            Map<Symbol, Object> annotations = messageAnnotations.getValue();
             return annotations.get(AmqpMessageSupport.getSymbol(key));
         }
 
@@ -138,16 +139,18 @@ public final class AmqpMessageSupport {
      *
      * @param contentType
      *        content type string to compare against, or null if none
-     * @param message
-     *        the AMQP message object that is being examined.
+     * @param messageContentType
+     *        the content type value read from an AMQP message object.
      *
      * @return true if content type matches
      */
-    public static boolean isContentType(String contentType, Message message) {
+    public static boolean isContentType(String contentType, Symbol messageContentType) {
         if (contentType == null) {
-            return message.getContentType() == null;
+            return messageContentType == null;
+        } else if (messageContentType == null) {
+            return false;
         } else {
-            return contentType.equals(message.getContentType());
+            return contentType.equals(messageContentType.toString());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/657747b7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java
index ec73ba9..b8cdbb4 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java
@@ -17,14 +17,12 @@
 package org.apache.qpid.jms.provider.amqp.message;
 
 import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE;
-import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.decodeMessage;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
-import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.qpid.jms.policy.JmsDeserializationPolicy;
 import org.apache.qpid.jms.util.ClassLoadingAwareObjectInputStream;
@@ -32,9 +30,6 @@ import org.apache.qpid.jms.util.ClassLoadingAwareObjectInputStream.TrustedClassF
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.messaging.Data;
 import org.apache.qpid.proton.amqp.messaging.Section;
-import org.apache.qpid.proton.message.Message;
-
-import io.netty.buffer.ByteBuf;
 
 /**
  * Wrapper around an AMQP Message instance that will be treated as a JMS ObjectMessage
@@ -56,10 +51,7 @@ public class AmqpSerializedObjectDelegate implements AmqpObjectTypeDelegate, Tru
     }
 
     private final AmqpJmsMessageFacade parent;
-    private final Message message;
-    private final AtomicReference<Section> cachedReceivedBody = new AtomicReference<Section>();
     private final JmsDeserializationPolicy deserializationPolicy;
-    private ByteBuf messageBytes;
     private boolean localContent;
 
     /**
@@ -67,23 +59,14 @@ public class AmqpSerializedObjectDelegate implements AmqpObjectTypeDelegate, Tru
      *
      * @param parent
      *        the AMQP message facade instance where the object is to be stored / read.
-     * @param messageBytes
-     *        the raw bytes that comprise the message when it was received.
      * @param deserializationPolicy
      *        the JmsDeserializationPolicy that is used to validate the security of message
      *        content, may be null (e.g on new outgoing messages).
      */
-    public AmqpSerializedObjectDelegate(AmqpJmsMessageFacade parent, ByteBuf messageBytes, JmsDeserializationPolicy deserializationPolicy) {
+    public AmqpSerializedObjectDelegate(AmqpJmsMessageFacade parent, JmsDeserializationPolicy deserializationPolicy) {
         this.parent = parent;
-        this.message = parent.getAmqpMessage();
-        this.message.setContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
-        this.messageBytes = messageBytes;
+        this.parent.setContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
         this.deserializationPolicy = deserializationPolicy;
-
-        // Cache the body so the first access can grab it without extra work.
-        if (messageBytes != null) {
-            cachedReceivedBody.set(message.getBody());
-        }
     }
 
     private static byte[] getSerializedBytes(Serializable value) throws IOException {
@@ -100,31 +83,24 @@ public class AmqpSerializedObjectDelegate implements AmqpObjectTypeDelegate, Tru
 
     @Override
     public Serializable getObject() throws IOException, ClassNotFoundException {
-        Binary bin = null;
-
-        Section body = cachedReceivedBody.getAndSet(null);
-        if (body == null) {
-            if (messageBytes != null) {
-                body = decodeMessage(messageBytes).getBody();
-            } else {
-                body = message.getBody();
-            }
-        }
+        Binary binary = null;
+
+        Section body = parent.getBody();
 
         if (body == null || body == NULL_OBJECT_BODY) {
             return null;
         } else if (body instanceof Data) {
-            bin = ((Data) body).getValue();
+            binary = ((Data) body).getValue();
         } else {
             throw new IllegalStateException("Unexpected body type: " + body.getClass().getSimpleName());
         }
 
-        if (bin == null) {
+        if (binary == null) {
             return null;
         } else {
             Serializable serialized = null;
 
-            try (ByteArrayInputStream bais = new ByteArrayInputStream(bin.getArray(), bin.getArrayOffset(), bin.getLength());
+            try (ByteArrayInputStream bais = new ByteArrayInputStream(binary.getArray(), binary.getArrayOffset(), binary.getLength());
                  ClassLoadingAwareObjectInputStream objIn = new ClassLoadingAwareObjectInputStream(bais, this)) {
 
                 serialized = (Serializable) objIn.readObject();
@@ -136,24 +112,21 @@ public class AmqpSerializedObjectDelegate implements AmqpObjectTypeDelegate, Tru
 
     @Override
     public void setObject(Serializable value) throws IOException {
-        cachedReceivedBody.set(null);
-
         if (value == null) {
-            message.setBody(NULL_OBJECT_BODY);
+            parent.setBody(NULL_OBJECT_BODY);
         } else {
             byte[] bytes = getSerializedBytes(value);
-            message.setBody(new Data(new Binary(bytes)));
+            parent.setBody(new Data(new Binary(bytes)));
         }
 
-        messageBytes = null;
         localContent = true;
     }
 
     @Override
     public void onSend() {
-        message.setContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
-        if (message.getBody() == null) {
-            message.setBody(NULL_OBJECT_BODY);
+        parent.setContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
+        if (parent.getBody() == null) {
+            parent.setBody(NULL_OBJECT_BODY);
         }
     }
 
@@ -164,20 +137,11 @@ public class AmqpSerializedObjectDelegate implements AmqpObjectTypeDelegate, Tru
         } else {
             AmqpSerializedObjectDelegate target = (AmqpSerializedObjectDelegate) copy;
 
-            // Swap our cached value to the copy, we will just decode it if we need it.
-            target.cachedReceivedBody.set(cachedReceivedBody.getAndSet(null));
-
-            // If we have the original bytes just copy those and let the next get
-            // decode them into the payload, otherwise we need to do a deep copy.
-            if (messageBytes != null) {
-                target.messageBytes = messageBytes.copy();
-            }
-
             target.localContent = localContent;
 
             // Copy the already encoded message body if it exists, subsequent gets
             // will deserialize the data so no mutations can occur.
-            target.message.setBody(message.getBody());
+            target.parent.setBody(parent.getBody());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/657747b7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpTypedObjectDelegate.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpTypedObjectDelegate.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpTypedObjectDelegate.java
index 1296eaa..88b7b9d 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpTypedObjectDelegate.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpTypedObjectDelegate.java
@@ -16,20 +16,15 @@
  */
 package org.apache.qpid.jms.provider.amqp.message;
 
-import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.decodeMessage;
-import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.encodeMessage;
-
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.messaging.Data;
 import org.apache.qpid.proton.amqp.messaging.Section;
-import org.apache.qpid.proton.message.Message;
 
 import io.netty.buffer.ByteBuf;
 
@@ -41,39 +36,31 @@ public class AmqpTypedObjectDelegate implements AmqpObjectTypeDelegate {
 
     static final AmqpValue NULL_OBJECT_BODY = new AmqpValue(null);
 
-    private final Message message;
-    private final AtomicReference<Section> cachedReceivedBody = new AtomicReference<Section>();
-    private ByteBuf messageBytes;
+    private ByteBuf encodedBody;
+    private final AmqpJmsMessageFacade parent;
 
     /**
      * Create a new delegate that uses Java serialization to store the message content.
      *
      * @param parent
      *        the AMQP message facade instance where the object is to be stored / read.
-     * @param messageBytes
-     *        the raw bytes that comprise the AMQP message that was received.
      */
-    public AmqpTypedObjectDelegate(AmqpJmsMessageFacade parent, ByteBuf messageBytes) {
-        this.message = parent.getAmqpMessage();
-        this.message.setContentType(null);
-        this.messageBytes = messageBytes;
-
-        // Cache the body so the first access can grab it without extra work.
-        if (messageBytes != null) {
-            cachedReceivedBody.set(message.getBody());
+    public AmqpTypedObjectDelegate(AmqpJmsMessageFacade parent) {
+        this.parent = parent;
+        this.parent.setContentType(null);
+
+        // Create a duplicate of the message body for decode on read attempts
+        if (parent.getBody() != null) {
+            encodedBody = AmqpCodec.encode(parent.getBody());
         }
     }
 
     @Override
     public Serializable getObject() throws IOException, ClassNotFoundException {
-        Section body = cachedReceivedBody.getAndSet(null);
+        Section body = null;
 
-        if (body == null) {
-            if (messageBytes != null) {
-                body = decodeMessage(messageBytes).getBody();
-            } else {
-                body = message.getBody();
-            }
+        if (encodedBody != null) {
+            body = AmqpCodec.decode(encodedBody);
         }
 
         if (body == null) {
@@ -98,20 +85,15 @@ public class AmqpTypedObjectDelegate implements AmqpObjectTypeDelegate {
 
     @Override
     public void setObject(Serializable value) throws IOException {
-        cachedReceivedBody.set(null);
-
         if (value == null) {
-            message.setBody(NULL_OBJECT_BODY);
-            messageBytes = null;
+            parent.setBody(NULL_OBJECT_BODY);
+            encodedBody = null;
         } else if (isSupportedAmqpValueObjectType(value)) {
-            Message transfer = Message.Factory.create();
-
             // Exchange the incoming body value for one that is created from encoding
             // and decoding the value. Save the bytes for subsequent getObject and
             // copyInto calls to use.
-            transfer.setBody(new AmqpValue(value));
-            messageBytes = encodeMessage(transfer);
-            transfer = decodeMessage(messageBytes);
+            encodedBody = AmqpCodec.encode(new AmqpValue(value));
+            Section decodedBody = AmqpCodec.decode(encodedBody);
 
             // This step requires a heavy-weight operation of both encoding and decoding the
             // incoming body value in order to create a copy such that changes to the original
@@ -120,7 +102,7 @@ public class AmqpTypedObjectDelegate implements AmqpObjectTypeDelegate {
             // proton such that we can encode the body and use those bytes directly on the
             // message as it is being sent.
 
-            message.setBody(transfer.getBody());
+            parent.setBody(decodedBody);
         } else {
             // TODO: Data and AmqpSequence?
             throw new IllegalArgumentException("Encoding this object type with the AMQP type system is not supported: " + value.getClass().getName());
@@ -129,9 +111,9 @@ public class AmqpTypedObjectDelegate implements AmqpObjectTypeDelegate {
 
     @Override
     public void onSend() {
-        message.setContentType(null);
-        if (message.getBody() == null) {
-            message.setBody(NULL_OBJECT_BODY);
+        parent.setContentType(null);
+        if (parent.getBody() == null) {
+            parent.setBody(NULL_OBJECT_BODY);
         }
     }
 
@@ -142,23 +124,17 @@ public class AmqpTypedObjectDelegate implements AmqpObjectTypeDelegate {
         } else {
             AmqpTypedObjectDelegate target = (AmqpTypedObjectDelegate) copy;
 
-            // Swap our cached value (if any) to the copy, we will just decode it if we need it later.
-            target.cachedReceivedBody.set(cachedReceivedBody.getAndSet(null));
-
-            if (messageBytes != null) {
-                // If we have the original bytes just copy those and let the next get
-                // decode them into the payload (or for the copy, use the cached
-                // body if it was swapped above).
-                target.messageBytes = messageBytes.copy();
+            // If there ever was a body then we will have a snapshot of it and we can
+            // be sure that our state is correct.
+            if (encodedBody != null) {
+                // If we have any body bytes just duplicate those and let the next get
+                // decode them into the returned object payload value.
+                target.encodedBody = encodedBody.duplicate();
 
                 // Internal message body copy to satisfy sends. This is safe since the body was set
                 // from a copy (decoded from the bytes) to ensure it is a snapshot. Also safe for
                 // gets as they will use the message bytes (or cached body if set) to return the object.
-                target.message.setBody(message.getBody());
-            } else {
-                // We have to deep get/set copy here, otherwise a get might return
-                // the object value carried by the original version.
-                copy.setObject(getObject());
+                target.parent.setBody(parent.getBody());
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/657747b7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpWritableBuffer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpWritableBuffer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpWritableBuffer.java
new file mode 100644
index 0000000..7088b5f
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpWritableBuffer.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.message;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.proton.codec.WritableBuffer;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+/**
+ * Writable Buffer implementation based on a Netty ByteBuf
+ */
+public class AmqpWritableBuffer implements WritableBuffer {
+
+    public ByteBuf nettyBuffer;
+
+    public AmqpWritableBuffer() {
+        nettyBuffer = Unpooled.buffer(1024);
+    }
+
+    public AmqpWritableBuffer(ByteBuf buffer) {
+        nettyBuffer = buffer;
+    }
+
+    public ByteBuf getBuffer() {
+        return nettyBuffer;
+    }
+
+    @Override
+    public void put(byte b) {
+        nettyBuffer.writeByte(b);
+    }
+
+    @Override
+    public void putFloat(float f) {
+        nettyBuffer.writeFloat(f);
+    }
+
+    @Override
+    public void putDouble(double d) {
+        nettyBuffer.writeDouble(d);
+    }
+
+    @Override
+    public void put(byte[] src, int offset, int length) {
+        nettyBuffer.writeBytes(src, offset, length);
+    }
+
+    @Override
+    public void put(ByteBuffer payload) {
+        nettyBuffer.writeBytes(payload);
+    }
+
+    public void put(ByteBuf payload) {
+        nettyBuffer.writeBytes(payload);
+    }
+
+    @Override
+    public void putShort(short s) {
+        nettyBuffer.writeShort(s);
+    }
+
+    @Override
+    public void putInt(int i) {
+        nettyBuffer.writeInt(i);
+    }
+
+    @Override
+    public void putLong(long l) {
+        nettyBuffer.writeLong(l);
+    }
+
+    @Override
+    public boolean hasRemaining() {
+        return nettyBuffer.writerIndex() < nettyBuffer.capacity();
+    }
+
+    @Override
+    public int remaining() {
+        return nettyBuffer.capacity() - nettyBuffer.writerIndex();
+    }
+
+    @Override
+    public int position() {
+        return nettyBuffer.writerIndex();
+    }
+
+    @Override
+    public void position(int position) {
+        nettyBuffer.writerIndex(position);
+    }
+
+    @Override
+    public int limit() {
+        return nettyBuffer.capacity();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/657747b7/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java
index f9188c5..3254c87 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java
@@ -22,6 +22,7 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
@@ -29,14 +30,17 @@ import java.io.IOException;
 import java.util.Arrays;
 
 import javax.jms.BytesMessage;
+import javax.jms.CompletionListener;
 import javax.jms.Connection;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageNotWriteableException;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
 
+import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
 import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
@@ -383,4 +387,105 @@ public class BytesMessageIntegrationTest extends QpidJmsTestCase {
             testPeer.waitForAllHandlersToComplete(3000);
         }
     }
+
+    @Test(timeout = 20000)
+    public void testAsyncSendMarksBytesMessageReadOnly() throws Exception {
+        try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+            connection.setSendTimeout(15000);
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            String queueName = "myQueue";
+            Queue queue = session.createQueue(queueName);
+
+            BytesMessage message = session.createBytesMessage();
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+
+            // Expect the producer to attach and grant it some credit, it should send
+            // a transfer which we will not send any response so that we can check that
+            // the inflight message is read-only
+            testPeer.expectSenderAttach();
+            testPeer.expectTransferButDoNotRespond(messageMatcher);
+            testPeer.expectClose();
+
+            MessageProducer producer = session.createProducer(queue);
+            TestJmsCompletionListener listener = new TestJmsCompletionListener();
+
+            try {
+                producer.send(message, listener);
+            } catch (Throwable error) {
+                fail("Send should not fail for async.");
+            }
+
+            try {
+                message.setJMSCorrelationID("test");
+                fail("Should not be able to set properties on inflight message");
+            } catch (MessageNotWriteableException mnwe) {}
+            try {
+                message.setJMSCorrelationIDAsBytes(new byte[]{});
+                fail("Should not be able to set properties on inflight message");
+            } catch (MessageNotWriteableException mnwe) {}
+            try {
+                message.setJMSDeliveryMode(0);
+                fail("Should not be able to set properties on inflight message");
+            } catch (MessageNotWriteableException mnwe) {}
+            try {
+                message.setJMSDestination(queue);
+                fail("Should not be able to set properties on inflight message");
+            } catch (MessageNotWriteableException mnwe) {}
+            try {
+                message.setJMSExpiration(0);
+                fail("Should not be able to set properties on inflight message");
+            } catch (MessageNotWriteableException mnwe) {}
+            try {
+                message.setJMSMessageID(queueName);
+                fail("Should not be able to set properties on inflight message");
+            } catch (MessageNotWriteableException mnwe) {}
+            try {
+                message.setJMSPriority(0);
+                fail("Should not be able to set properties on inflight message");
+            } catch (MessageNotWriteableException mnwe) {}
+            try {
+                message.setJMSRedelivered(false);
+                fail("Should not be able to set properties on inflight message");
+            } catch (MessageNotWriteableException mnwe) {}
+            try {
+                message.setJMSReplyTo(queue);
+                fail("Should not be able to set properties on inflight message");
+            } catch (MessageNotWriteableException mnwe) {}
+            try {
+                message.setJMSTimestamp(0);
+                fail("Should not be able to set properties on inflight message");
+            } catch (MessageNotWriteableException mnwe) {}
+            try {
+                message.setJMSType(queueName);
+                fail("Should not be able to set properties on inflight message");
+            } catch (MessageNotWriteableException mnwe) {}
+            try {
+                message.setStringProperty("test", "test");
+                fail("Should not be able to set properties on inflight message");
+            } catch (MessageNotWriteableException mnwe) {}
+            try {
+                message.writeBoolean(true);
+                fail("Message should not be writable after a send.");
+            } catch (MessageNotWriteableException mnwe) {}
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    private class TestJmsCompletionListener implements CompletionListener {
+
+        @Override
+        public void onCompletion(Message message) {
+        }
+
+        @Override
+        public void onException(Message message, Exception exception) {
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/657747b7/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
index 10bcffd..02ca520 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
@@ -29,15 +29,18 @@ import java.util.Arrays;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+import javax.jms.CompletionListener;
 import javax.jms.Connection;
 import javax.jms.MapMessage;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageFormatException;
+import javax.jms.MessageNotWriteableException;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
 
+import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
 import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
@@ -263,4 +266,159 @@ public class MapMessageIntegrationTest extends QpidJmsTestCase {
             testPeer.waitForAllHandlersToComplete(3000);
         }
     }
+
+    @Test(timeout = 20000)
+    public void testSendMapMessageIsWritable() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+            MessageProducer producer = session.createProducer(queue);
+
+            String myIntKey = "myInt";
+            int myInt = Integer.MAX_VALUE;
+            String myStringKey = "myString";
+            String myString = myStringKey;
+
+            // Prepare a MapMessage to send to the test peer to send
+            MapMessage mapMessage = session.createMapMessage();
+
+            mapMessage.setString(myStringKey, myString);
+
+            // prepare a matcher for the test peer to use to receive and verify the message
+            Map<String, Object> map = new LinkedHashMap<String, Object>();
+            map.put(myStringKey, myString);
+
+            MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
+            MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+            msgAnnotationsMatcher.withEntry(Symbol.valueOf(AmqpMessageSupport.JMS_MSG_TYPE), equalTo(AmqpMessageSupport.JMS_MAP_MESSAGE));
+            MessagePropertiesSectionMatcher propertiesMatcher = new MessagePropertiesSectionMatcher(true);
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(headersMatcher);
+            messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+            messageMatcher.setPropertiesMatcher(propertiesMatcher);
+            messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(map));
+
+            testPeer.expectTransfer(messageMatcher);
+
+            // send the message
+            producer.send(mapMessage);
+
+            // Update the message and matcher and send again
+            mapMessage.setInt(myIntKey, myInt);
+            map.put(myIntKey, myInt);
+            testPeer.expectTransfer(messageMatcher);
+            testPeer.expectClose();
+
+            producer.send(mapMessage);
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(3000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testAsyncSendMarksMapMessageReadOnly() throws Exception {
+        try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+            connection.setSendTimeout(15000);
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            String queueName = "myQueue";
+            Queue queue = session.createQueue(queueName);
+
+            MapMessage message = session.createMapMessage();
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+
+            // Expect the producer to attach and grant it some credit, it should send
+            // a transfer which we will not send any response so that we can check that
+            // the inflight message is read-only
+            testPeer.expectSenderAttach();
+            testPeer.expectTransferButDoNotRespond(messageMatcher);
+            testPeer.expectClose();
+
+            MessageProducer producer = session.createProducer(queue);
+            TestJmsCompletionListener listener = new TestJmsCompletionListener();
+
+            try {
+                producer.send(message, listener);
+            } catch (Throwable error) {
+                fail("Send should not fail for async.");
+            }
+
+            try {
+                message.setJMSCorrelationID("test");
+                fail("Should not be able to set properties on inflight message");
+            } catch (MessageNotWriteableException mnwe) {}
+            try {
+                message.setJMSCorrelationIDAsBytes(new byte[]{});
+                fail("Should not be able to set properties on inflight message");
+            } catch (MessageNotWriteableException mnwe) {}
+            try {
+                message.setJMSDeliveryMode(0);
+                fail("Should not be able to set properties on inflight message");
+            } catch (MessageNotWriteableException mnwe) {}
+            try {
+                message.setJMSDestination(queue);
+                fail("Should not be able to set properties on inflight message");
+            } catch (MessageNotWriteableException mnwe) {}
+            try {
+                message.setJMSExpiration(0);
+                fail("Should not be able to set properties on inflight message");
+            } catch (MessageNotWriteableException mnwe) {}
+            try {
+                message.setJMSMessageID(queueName);
+                fail("Should not be able to set properties on inflight message");
+            } catch (MessageNotWriteableException mnwe) {}
+            try {
+                message.setJMSPriority(0);
+                fail("Should not be able to set properties on inflight message");
+            } catch (MessageNotWriteableException mnwe) {}
+            try {
+                message.setJMSRedelivered(false);
+                fail("Should not be able to set properties on inflight message");
+            } catch (MessageNotWriteableException mnwe) {}
+            try {
+                message.setJMSReplyTo(queue);
+                fail("Should not be able to set properties on inflight message");
+            } catch (MessageNotWriteableException mnwe) {}
+            try {
+                message.setJMSTimestamp(0);
+                fail("Should not be able to set properties on inflight message");
+            } catch (MessageNotWriteableException mnwe) {}
+            try {
+                message.setJMSType(queueName);
+                fail("Should not be able to set properties on inflight message");
+            } catch (MessageNotWriteableException mnwe) {}
+            try {
+                message.setStringProperty("test", "test");
+                fail("Should not be able to set properties on inflight message");
+            } catch (MessageNotWriteableException mnwe) {}
+            try {
+                message.setString("test", "test");
+                fail("Message should not be writable after a send.");
+            } catch (MessageNotWriteableException mnwe) {}
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    private class TestJmsCompletionListener implements CompletionListener {
+
+        @Override
+        public void onCompletion(Message message) {
+        }
+
+        @Override
+        public void onException(Message message, Exception exception) {
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org