You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/09/26 13:19:48 UTC

[1/5] activemq-artemis git commit: ARTEMIS-2096 Refactor AMQMessage abstraction

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 2453978f4 -> 7a463f038


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a851a8f9/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
index 94df3a5..74f399e 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
@@ -29,6 +29,8 @@ import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessa
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
 import org.apache.activemq.artemis.utils.collections.TypedProperties;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
@@ -40,6 +42,8 @@ import org.apache.qpid.proton.message.impl.MessageImpl;
 import org.junit.Assert;
 import org.junit.Test;
 
+import io.netty.buffer.Unpooled;
+
 
 public class TestConversions extends Assert {
 
@@ -58,12 +62,11 @@ public class TestConversions extends Assert {
 
       message.setBody(new AmqpValue(new Boolean(true)));
 
-      AMQPMessage encodedMessage = new AMQPMessage(message);
+      AMQPMessage encodedMessage = encodeAndCreateAMQPMessage(message);
 
       ICoreMessage serverMessage = encodedMessage.toCore();
 
       verifyProperties(ServerJMSMessage.wrapCoreMessage(serverMessage));
-
    }
 
    @Test
@@ -81,7 +84,7 @@ public class TestConversions extends Assert {
 
       message.setBody(new Data(new Binary(bodyBytes)));
 
-      AMQPMessage encodedMessage = new AMQPMessage(message);
+      AMQPMessage encodedMessage = encodeAndCreateAMQPMessage(message);
 
       ICoreMessage serverMessage = encodedMessage.toCore();
 
@@ -96,7 +99,6 @@ public class TestConversions extends Assert {
       bytesMessage.readBytes(newBodyBytes);
 
       Assert.assertArrayEquals(bodyBytes, newBodyBytes);
-
    }
 
    private void verifyProperties(javax.jms.Message message) throws Exception {
@@ -135,7 +137,7 @@ public class TestConversions extends Assert {
 
       message.setBody(new AmqpValue(mapValues));
 
-      AMQPMessage encodedMessage = new AMQPMessage(message);
+      AMQPMessage encodedMessage = encodeAndCreateAMQPMessage(message);
 
       ICoreMessage serverMessage = encodedMessage.toCore();
       serverMessage.getReadOnlyBodyBuffer();
@@ -145,11 +147,11 @@ public class TestConversions extends Assert {
 
       verifyProperties(mapMessage);
 
-      Assert.assertEquals(1, mapMessage.getInt("someint"));
-      Assert.assertEquals("value", mapMessage.getString("somestr"));
+      assertEquals(1, mapMessage.getInt("someint"));
+      assertEquals("value", mapMessage.getString("somestr"));
 
       AMQPMessage newAMQP = CoreAmqpConverter.fromCore(mapMessage.getInnerMessage());
-      System.out.println(newAMQP.getProtonMessage().getBody());
+      assertNotNull(newAMQP.getBody());
    }
 
    @Test
@@ -165,7 +167,7 @@ public class TestConversions extends Assert {
 
       message.setBody(new AmqpSequence(objects));
 
-      AMQPMessage encodedMessage = new AMQPMessage(message);
+      AMQPMessage encodedMessage = encodeAndCreateAMQPMessage(message);
 
       ICoreMessage serverMessage = encodedMessage.toCore();
 
@@ -189,7 +191,7 @@ public class TestConversions extends Assert {
       String text = "someText";
       message.setBody(new AmqpValue(text));
 
-      AMQPMessage encodedMessage = new AMQPMessage(message);
+      AMQPMessage encodedMessage = encodeAndCreateAMQPMessage(message);
 
       ICoreMessage serverMessage = encodedMessage.toCore();
 
@@ -198,8 +200,7 @@ public class TestConversions extends Assert {
 
       verifyProperties(textMessage);
 
-      Assert.assertEquals(text, textMessage.getText());
-
+      assertEquals(text, textMessage.getText());
    }
 
    @Test
@@ -209,7 +210,7 @@ public class TestConversions extends Assert {
       String text = "someText";
       message.setBody(new AmqpValue(text));
 
-      AMQPMessage encodedMessage = new AMQPMessage(message);
+      AMQPMessage encodedMessage = encodeAndCreateAMQPMessage(message);
       TypedProperties extraProperties = createTypedPropertiesMap();
       extraProperties.putBytesProperty(new SimpleString("bytesProp"), "value".getBytes());
       encodedMessage.setExtraProperties(extraProperties);
@@ -222,8 +223,15 @@ public class TestConversions extends Assert {
       verifyProperties(textMessage);
       assertEquals("value", new String(((byte[]) textMessage.getObjectProperty("bytesProp"))));
 
-      Assert.assertEquals(text, textMessage.getText());
-
+      assertEquals(text, textMessage.getText());
    }
 
+   private AMQPMessage encodeAndCreateAMQPMessage(MessageImpl message) {
+      NettyWritable encoded = new NettyWritable(Unpooled.buffer(1024));
+      message.encode(encoded);
+
+      NettyReadable readable = new NettyReadable(encoded.getByteBuf());
+
+      return new AMQPMessage(AMQPMessage.DEFAULT_MESSAGE_FORMAT, readable, null, null);
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a851a8f9/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformerTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformerTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformerTest.java
index b7092c3..691bb0d 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformerTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformerTest.java
@@ -16,12 +16,11 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.converter.message;
 
-import javax.jms.Destination;
-import javax.jms.Queue;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -30,6 +29,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import javax.jms.Destination;
+import javax.jms.Queue;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
@@ -39,7 +45,8 @@ import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage;
-import org.apache.qpid.proton.Proton;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
@@ -47,13 +54,11 @@ import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.messaging.Data;
 import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
 import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.message.impl.MessageImpl;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import io.netty.buffer.Unpooled;
 
 public class JMSMappingInboundTransformerTest {
 
@@ -72,10 +77,10 @@ public class JMSMappingInboundTransformerTest {
     */
    @Test
    public void testCreateBytesMessageFromNoBodySectionAndContentType() throws Exception {
-      Message message = Message.Factory.create();
+      MessageImpl message = (MessageImpl) Message.Factory.create();
       message.setContentType(AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE);
 
-      AMQPMessage messageEncode = new AMQPMessage(message);
+      AMQPMessage messageEncode = encodeAndCreateAMQPMessage(message);
 
       ICoreMessage coreMessage = messageEncode.toCore();
 
@@ -94,9 +99,9 @@ public class JMSMappingInboundTransformerTest {
     */
    @Test
    public void testCreateBytesMessageFromNoBodySectionAndNoContentType() throws Exception {
-      Message message = Message.Factory.create();
+      MessageImpl message = (MessageImpl) Message.Factory.create();
 
-      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
+      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
 
       assertNotNull("Message should not be null", jmsMessage);
       assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
@@ -104,10 +109,10 @@ public class JMSMappingInboundTransformerTest {
 
    @Test
    public void testCreateTextMessageFromNoBodySectionAndContentType() throws Exception {
-      Message message = Message.Factory.create();
+      MessageImpl message = (MessageImpl) Message.Factory.create();
       message.setContentType("text/plain");
 
-      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
+      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
 
       assertNotNull("Message should not be null", jmsMessage);
       assertEquals("Unexpected message class type", ServerJMSTextMessage.class, jmsMessage.getClass());
@@ -125,12 +130,13 @@ public class JMSMappingInboundTransformerTest {
     */
    @Test
    public void testCreateBytesMessageFromDataWithEmptyBinaryAndContentType() throws Exception {
-      Message message = Proton.message();
+      MessageImpl message = (MessageImpl) Message.Factory.create();
       Binary binary = new Binary(new byte[0]);
       message.setBody(new Data(binary));
       message.setContentType(AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE);
 
-      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
+      AMQPMessage amqp = encodeAndCreateAMQPMessage(message);
+      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(amqp.toCore());
 
       assertNotNull("Message should not be null", jmsMessage);
       assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
@@ -145,12 +151,12 @@ public class JMSMappingInboundTransformerTest {
     *         if an error occurs during the test.
     */
    public void testCreateBytesMessageFromDataWithUnknownContentType() throws Exception {
-      Message message = Proton.message();
+      MessageImpl message = (MessageImpl) Message.Factory.create();
       Binary binary = new Binary(new byte[0]);
       message.setBody(new Data(binary));
       message.setContentType("unknown-content-type");
 
-      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
+      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
 
       assertNotNull("Message should not be null", jmsMessage);
       assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
@@ -165,13 +171,13 @@ public class JMSMappingInboundTransformerTest {
     */
    @Test
    public void testCreateBytesMessageFromDataWithEmptyBinaryAndNoContentType() throws Exception {
-      Message message = Proton.message();
+      MessageImpl message = (MessageImpl) Message.Factory.create();
       Binary binary = new Binary(new byte[0]);
       message.setBody(new Data(binary));
 
       assertNull(message.getContentType());
 
-      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
+      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
 
       assertNotNull("Message should not be null", jmsMessage);
       assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
@@ -187,12 +193,12 @@ public class JMSMappingInboundTransformerTest {
     */
    @Test
    public void testCreateObjectMessageFromDataWithContentTypeAndEmptyBinary() throws Exception {
-      Message message = Proton.message();
+      MessageImpl message = (MessageImpl) Message.Factory.create();
       Binary binary = new Binary(new byte[0]);
       message.setBody(new Data(binary));
       message.setContentType(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString());
 
-      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
+      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
 
       assertNotNull("Message should not be null", jmsMessage);
       assertEquals("Unexpected message class type", ServerJMSObjectMessage.class, jmsMessage.getClass());
@@ -287,12 +293,12 @@ public class JMSMappingInboundTransformerTest {
    }
 
    private void doCreateTextMessageFromDataWithContentTypeTestImpl(String contentType, Charset expectedCharset) throws Exception {
-      Message message = Proton.message();
+      MessageImpl message = (MessageImpl) Message.Factory.create();
       Binary binary = new Binary(new byte[0]);
       message.setBody(new Data(binary));
       message.setContentType(contentType);
 
-      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
+      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
 
       assertNotNull("Message should not be null", jmsMessage);
       if (StandardCharsets.UTF_8.equals(expectedCharset)) {
@@ -313,10 +319,10 @@ public class JMSMappingInboundTransformerTest {
     */
    @Test
    public void testCreateTextMessageFromAmqpValueWithString() throws Exception {
-      Message message = Proton.message();
+      MessageImpl message = (MessageImpl) Message.Factory.create();
       message.setBody(new AmqpValue("content"));
 
-      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
+      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
 
       assertNotNull("Message should not be null", jmsMessage);
       assertEquals("Unexpected message class type", ServerJMSTextMessage.class, jmsMessage.getClass());
@@ -331,10 +337,10 @@ public class JMSMappingInboundTransformerTest {
     */
    @Test
    public void testCreateTextMessageFromAmqpValueWithNull() throws Exception {
-      Message message = Proton.message();
+      MessageImpl message = (MessageImpl) Message.Factory.create();
       message.setBody(new AmqpValue(null));
 
-      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
+      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
 
       assertNotNull("Message should not be null", jmsMessage);
       assertEquals("Unexpected message class type", ServerJMSTextMessage.class, jmsMessage.getClass());
@@ -350,11 +356,11 @@ public class JMSMappingInboundTransformerTest {
     */
    @Test
    public void testCreateObjectMessageFromAmqpValueWithBinaryAndContentType() throws Exception {
-      Message message = Message.Factory.create();
+      MessageImpl message = (MessageImpl) Message.Factory.create();
       message.setBody(new AmqpValue(new Binary(new byte[0])));
       message.setContentType(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString());
 
-      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
+      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
 
       assertNotNull("Message should not be null", jmsMessage);
       assertEquals("Unexpected message class type", ServerJMSObjectMessage.class, jmsMessage.getClass());
@@ -369,11 +375,11 @@ public class JMSMappingInboundTransformerTest {
     */
    @Test
    public void testCreateAmqpMapMessageFromAmqpValueWithMap() throws Exception {
-      Message message = Proton.message();
+      MessageImpl message = (MessageImpl) Message.Factory.create();
       Map<String, String> map = new HashMap<>();
       message.setBody(new AmqpValue(map));
 
-      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
+      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
 
       assertNotNull("Message should not be null", jmsMessage);
       assertEquals("Unexpected message class type", ServerJMSMapMessage.class, jmsMessage.getClass());
@@ -388,11 +394,11 @@ public class JMSMappingInboundTransformerTest {
     */
    @Test
    public void testCreateAmqpStreamMessageFromAmqpValueWithList() throws Exception {
-      Message message = Proton.message();
+      MessageImpl message = (MessageImpl) Message.Factory.create();
       List<String> list = new ArrayList<>();
       message.setBody(new AmqpValue(list));
 
-      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
+      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
 
       assertNotNull("Message should not be null", jmsMessage);
       assertEquals("Unexpected message class type", ServerJMSStreamMessage.class, jmsMessage.getClass());
@@ -407,11 +413,11 @@ public class JMSMappingInboundTransformerTest {
     */
    @Test
    public void testCreateAmqpStreamMessageFromAmqpSequence() throws Exception {
-      Message message = Proton.message();
+      MessageImpl message = (MessageImpl) Message.Factory.create();
       List<String> list = new ArrayList<>();
       message.setBody(new AmqpSequence(list));
 
-      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
+      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
 
       assertNotNull("Message should not be null", jmsMessage);
       assertEquals("Unexpected message class type", ServerJMSStreamMessage.class, jmsMessage.getClass());
@@ -426,11 +432,11 @@ public class JMSMappingInboundTransformerTest {
     */
    @Test
    public void testCreateAmqpBytesMessageFromAmqpValueWithBinary() throws Exception {
-      Message message = Proton.message();
+      MessageImpl message = (MessageImpl) Message.Factory.create();
       Binary binary = new Binary(new byte[0]);
       message.setBody(new AmqpValue(binary));
 
-      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
+      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
 
       assertNotNull("Message should not be null", jmsMessage);
       assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
@@ -446,10 +452,10 @@ public class JMSMappingInboundTransformerTest {
     */
    @Test
    public void testCreateBytesMessageFromAmqpValueWithUncategorisedContent() throws Exception {
-      Message message = Proton.message();
+      MessageImpl message = (MessageImpl) Message.Factory.create();
       message.setBody(new AmqpValue(UUID.randomUUID()));
 
-      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
+      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
 
       assertNotNull("Message should not be null", jmsMessage);
       assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
@@ -458,10 +464,10 @@ public class JMSMappingInboundTransformerTest {
    @Test
    public void testTransformMessageWithAmqpValueStringCreatesTextMessage() throws Exception {
       String contentString = "myTextMessageContent";
-      Message message = Message.Factory.create();
+      MessageImpl message = (MessageImpl) Message.Factory.create();
       message.setBody(new AmqpValue(contentString));
 
-      ServerJMSTextMessage jmsMessage = (ServerJMSTextMessage)ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
+      ServerJMSTextMessage jmsMessage = (ServerJMSTextMessage)ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
       jmsMessage.decode();
 
       assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage);
@@ -504,17 +510,17 @@ public class JMSMappingInboundTransformerTest {
       throws Exception {
 
       String toAddress = "toAddress";
-      Message amqp = Message.Factory.create();
-      amqp.setBody(new AmqpValue("myTextMessageContent"));
-      amqp.setAddress(toAddress);
+      MessageImpl message = (MessageImpl) Message.Factory.create();
+      message.setBody(new AmqpValue("myTextMessageContent"));
+      message.setAddress(toAddress);
       if (toTypeAnnotationValue != null) {
          Map<Symbol, Object> map = new HashMap<>();
          map.put(Symbol.valueOf("x-opt-to-type"), toTypeAnnotationValue);
          MessageAnnotations ma = new MessageAnnotations(map);
-         amqp.setMessageAnnotations(ma);
+         message.setMessageAnnotations(ma);
       }
 
-      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(amqp).toCore());
+      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
       assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage);
    }
 
@@ -549,18 +555,26 @@ public class JMSMappingInboundTransformerTest {
       throws Exception {
 
       String replyToAddress = "replyToAddress";
-      Message amqp = Message.Factory.create();
-      amqp.setBody(new AmqpValue("myTextMessageContent"));
-      amqp.setReplyTo(replyToAddress);
+      MessageImpl message = (MessageImpl) Message.Factory.create();
+      message.setBody(new AmqpValue("myTextMessageContent"));
+      message.setReplyTo(replyToAddress);
       if (replyToTypeAnnotationValue != null) {
          Map<Symbol, Object> map = new HashMap<>();
          map.put(Symbol.valueOf("x-opt-reply-type"), replyToTypeAnnotationValue);
          MessageAnnotations ma = new MessageAnnotations(map);
-         amqp.setMessageAnnotations(ma);
+         message.setMessageAnnotations(ma);
       }
 
-      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(amqp).toCore());
+      javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
       assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage);
    }
 
+   private AMQPMessage encodeAndCreateAMQPMessage(MessageImpl message) {
+      NettyWritable encoded = new NettyWritable(Unpooled.buffer(1024));
+      message.encode(encoded);
+
+      NettyReadable readable = new NettyReadable(encoded.getByteBuf());
+
+      return new AMQPMessage(AMQPMessage.DEFAULT_MESSAGE_FORMAT, readable, null, null);
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a851a8f9/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
index 565f67b..7d573ed 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
@@ -44,6 +44,7 @@ import javax.jms.JMSException;
 
 import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter;
 import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination;
@@ -59,7 +60,6 @@ import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.messaging.Data;
 import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
-import org.apache.qpid.proton.message.Message;
 import org.junit.Test;
 
 public class JMSMappingOutboundTransformerTest {
@@ -79,7 +79,7 @@ public class JMSMappingOutboundTransformerTest {
       ServerJMSMessage outbound = createMessage();
       outbound.encode();
 
-      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
+      AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
 
       assertNull(amqp.getBody());
    }
@@ -90,7 +90,7 @@ public class JMSMappingOutboundTransformerTest {
       outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_NULL);
       outbound.encode();
 
-      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
+      AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
 
       assertNull(amqp.getBody());
    }
@@ -104,7 +104,7 @@ public class JMSMappingOutboundTransformerTest {
       outbound.writeBytes(expectedPayload);
       outbound.encode();
 
-      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
+      AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
 
       assertNotNull(amqp.getBody());
       assertTrue(amqp.getBody() instanceof Data);
@@ -123,7 +123,7 @@ public class JMSMappingOutboundTransformerTest {
       outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
       outbound.encode();
 
-      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
+      AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
 
       assertNotNull(amqp.getBody());
       assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -139,7 +139,7 @@ public class JMSMappingOutboundTransformerTest {
       outbound.writeBytes(expectedPayload);
       outbound.encode();
 
-      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
+      AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
 
       assertNotNull(amqp.getBody());
       assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -159,7 +159,7 @@ public class JMSMappingOutboundTransformerTest {
       ServerJMSMapMessage outbound = createMapMessage();
       outbound.encode();
 
-      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
+      AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
 
       assertNotNull(amqp.getBody());
       assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -174,7 +174,7 @@ public class JMSMappingOutboundTransformerTest {
       outbound.setBytes("bytes", byteArray);
       outbound.encode();
 
-      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
+      AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
 
       assertNotNull(amqp.getBody());
       assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -196,7 +196,7 @@ public class JMSMappingOutboundTransformerTest {
       outbound.setBoolean("property-3", true);
       outbound.encode();
 
-      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
+      AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
 
       assertNotNull(amqp.getBody());
       assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -218,7 +218,7 @@ public class JMSMappingOutboundTransformerTest {
       outbound.writeString("test");
       outbound.encode();
 
-      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
+      AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
 
       assertNotNull(amqp.getBody());
       assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -239,7 +239,7 @@ public class JMSMappingOutboundTransformerTest {
       outbound.writeString("test");
       outbound.encode();
 
-      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
+      AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
 
       assertNotNull(amqp.getBody());
       assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -260,7 +260,7 @@ public class JMSMappingOutboundTransformerTest {
       outbound.writeString("test");
       outbound.encode();
 
-      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
+      AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
 
       assertNotNull(amqp.getBody());
       assertTrue(amqp.getBody() instanceof AmqpSequence);
@@ -279,7 +279,7 @@ public class JMSMappingOutboundTransformerTest {
       ServerJMSObjectMessage outbound = createObjectMessage();
       outbound.encode();
 
-      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
+      AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
 
       assertNotNull(amqp.getBody());
       assertTrue(amqp.getBody() instanceof Data);
@@ -292,7 +292,7 @@ public class JMSMappingOutboundTransformerTest {
       outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_UNKNOWN);
       outbound.encode();
 
-      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
+      AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
 
       assertNotNull(amqp.getBody());
       assertTrue(amqp.getBody() instanceof Data);
@@ -304,7 +304,7 @@ public class JMSMappingOutboundTransformerTest {
       ServerJMSObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE);
       outbound.encode();
 
-      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
+      AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
 
       assertNotNull(amqp.getBody());
       assertTrue(amqp.getBody() instanceof Data);
@@ -321,7 +321,7 @@ public class JMSMappingOutboundTransformerTest {
       outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_UNKNOWN);
       outbound.encode();
 
-      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
+      AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
 
       assertNotNull(amqp.getBody());
       assertTrue(amqp.getBody() instanceof Data);
@@ -338,7 +338,7 @@ public class JMSMappingOutboundTransformerTest {
       outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
       outbound.encode();
 
-      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
+      AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
 
       assertNotNull(amqp.getBody());
       assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -356,7 +356,7 @@ public class JMSMappingOutboundTransformerTest {
       outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
       outbound.encode();
 
-      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
+      AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
 
       assertNotNull(amqp.getBody());
       assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -371,7 +371,7 @@ public class JMSMappingOutboundTransformerTest {
       ServerJMSTextMessage outbound = createTextMessage();
       outbound.encode();
 
-      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
+      AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
 
       assertNotNull(amqp.getBody());
       assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -384,7 +384,7 @@ public class JMSMappingOutboundTransformerTest {
       ServerJMSTextMessage outbound = createTextMessage(contentString);
       outbound.encode();
 
-      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
+      AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
 
       assertNotNull(amqp.getBody());
       assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -397,7 +397,7 @@ public class JMSMappingOutboundTransformerTest {
       ServerJMSTextMessage outbound = createTextMessage(contentString);
       outbound.encode();
 
-      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
+      AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
 
       assertNotNull(amqp.getBody());
       assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -410,7 +410,7 @@ public class JMSMappingOutboundTransformerTest {
       ServerJMSTextMessage outbound = createTextMessage(contentString);
       outbound.encode();
 
-      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
+      AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
 
       assertNotNull(amqp.getBody());
       assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -427,7 +427,7 @@ public class JMSMappingOutboundTransformerTest {
       outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
       outbound.encode();
 
-      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
+      AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
 
       assertNotNull(amqp.getBody());
       assertTrue(amqp.getBody() instanceof Data);
@@ -445,7 +445,7 @@ public class JMSMappingOutboundTransformerTest {
       outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
       outbound.encode();
 
-      Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
+      AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
 
       assertNotNull(amqp.getBody());
       assertTrue(amqp.getBody() instanceof Data);
@@ -473,7 +473,7 @@ public class JMSMappingOutboundTransformerTest {
       textMessage.setText("myTextMessageContent");
       textMessage.setJMSDestination(jmsDestination);
 
-      Message amqp = AMQPConverter.getInstance().fromCore(textMessage.getInnerMessage()).getProtonMessage();
+      AMQPMessage amqp = AMQPConverter.getInstance().fromCore(textMessage.getInnerMessage());
 
       MessageAnnotations ma = amqp.getMessageAnnotations();
       Map<Symbol, Object> maMap = ma == null ? null : ma.getValue();
@@ -506,7 +506,7 @@ public class JMSMappingOutboundTransformerTest {
       textMessage.setText("myTextMessageContent");
       textMessage.setJMSReplyTo(jmsReplyTo);
 
-      Message amqp = AMQPConverter.getInstance().fromCore(textMessage.getInnerMessage()).getProtonMessage();
+      AMQPMessage amqp = AMQPConverter.getInstance().fromCore(textMessage.getInnerMessage());
 
       MessageAnnotations ma = amqp.getMessageAnnotations();
       Map<Symbol, Object> maMap = ma == null ? null : ma.getValue();
@@ -518,7 +518,7 @@ public class JMSMappingOutboundTransformerTest {
       }
 
       if (jmsReplyTo != null) {
-         assertEquals("Unexpected 'reply-to' address", jmsReplyTo.getAddress(), amqp.getReplyTo());
+         assertEquals("Unexpected 'reply-to' address", jmsReplyTo.getSimpleAddress(), amqp.getReplyTo());
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a851a8f9/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSTransformationSpeedComparisonTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSTransformationSpeedComparisonTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSTransformationSpeedComparisonTest.java
index 483f245..a6f735d 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSTransformationSpeedComparisonTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSTransformationSpeedComparisonTest.java
@@ -21,23 +21,27 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
 import org.apache.qpid.proton.Proton;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
 import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
-import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.message.impl.MessageImpl;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+
 /**
  * Some simple performance tests for the Message Transformers.
  */
@@ -56,10 +60,9 @@ public class JMSTransformationSpeedComparisonTest {
 
    @Test
    public void testBodyOnlyMessage() throws Exception {
-
-      Message message = Proton.message();
+      MessageImpl message = (MessageImpl) Proton.message();
       message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
-      AMQPMessage encoded = new AMQPMessage(message);
+      AMQPMessage encoded = encodeAndCreateAMQPMessage(message);
 
       // Warm up
       for (int i = 0; i < WARM_CYCLES; ++i) {
@@ -81,8 +84,7 @@ public class JMSTransformationSpeedComparisonTest {
 
    @Test
    public void testMessageWithNoPropertiesOrAnnotations() throws Exception {
-
-      Message message = Proton.message();
+      MessageImpl message = (MessageImpl) Proton.message();
 
       message.setAddress("queue://test-queue");
       message.setDeliveryCount(1);
@@ -90,7 +92,7 @@ public class JMSTransformationSpeedComparisonTest {
       message.setContentType("text/plain");
       message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
 
-      AMQPMessage encoded = new AMQPMessage(message);
+      AMQPMessage encoded = encodeAndCreateAMQPMessage(message);
 
       // Warm up
       for (int i = 0; i < WARM_CYCLES; ++i) {
@@ -112,8 +114,7 @@ public class JMSTransformationSpeedComparisonTest {
 
    @Test
    public void testTypicalQpidJMSMessage() throws Exception {
-
-      AMQPMessage encoded = new AMQPMessage(createTypicalQpidJMSMessage());
+      AMQPMessage encoded = encodeAndCreateAMQPMessage(createTypicalQpidJMSMessage());
 
       // Warm up
       for (int i = 0; i < WARM_CYCLES; ++i) {
@@ -136,7 +137,7 @@ public class JMSTransformationSpeedComparisonTest {
    @Test
    public void testComplexQpidJMSMessage() throws Exception {
 
-      AMQPMessage encoded = encode(createComplexQpidJMSMessage());
+      AMQPMessage encoded = encodeAndCreateAMQPMessage(createComplexQpidJMSMessage());
 
       // Warm up
       for (int i = 0; i < WARM_CYCLES; ++i) {
@@ -159,7 +160,7 @@ public class JMSTransformationSpeedComparisonTest {
    @Test
    public void testTypicalQpidJMSMessageInBoundOnly() throws Exception {
 
-      AMQPMessage encoded = encode(createTypicalQpidJMSMessage());
+      AMQPMessage encoded = encodeAndCreateAMQPMessage(createTypicalQpidJMSMessage());
 
       // Warm up
       for (int i = 0; i < WARM_CYCLES; ++i) {
@@ -182,8 +183,7 @@ public class JMSTransformationSpeedComparisonTest {
 
    @Test
    public void testTypicalQpidJMSMessageOutBoundOnly() throws Exception {
-
-      AMQPMessage encoded = encode(createTypicalQpidJMSMessage());
+      AMQPMessage encoded = encodeAndCreateAMQPMessage(createTypicalQpidJMSMessage());
 
       // Warm up
       for (int i = 0; i < WARM_CYCLES; ++i) {
@@ -204,7 +204,7 @@ public class JMSTransformationSpeedComparisonTest {
       LOG_RESULTS(totalDuration);
    }
 
-   private Message createTypicalQpidJMSMessage() {
+   private MessageImpl createTypicalQpidJMSMessage() {
       Map<String, Object> applicationProperties = new HashMap<>();
       Map<Symbol, Object> messageAnnotations = new HashMap<>();
 
@@ -215,7 +215,7 @@ public class JMSTransformationSpeedComparisonTest {
       messageAnnotations.put(Symbol.valueOf("x-opt-jms-msg-type"), 0);
       messageAnnotations.put(Symbol.valueOf("x-opt-jms-dest"), 0);
 
-      Message message = Proton.message();
+      MessageImpl message = (MessageImpl) Proton.message();
 
       message.setAddress("queue://test-queue");
       message.setDeliveryCount(1);
@@ -228,7 +228,7 @@ public class JMSTransformationSpeedComparisonTest {
       return message;
    }
 
-   private Message createComplexQpidJMSMessage() {
+   private MessageImpl createComplexQpidJMSMessage() {
       Map<String, Object> applicationProperties = new HashMap<>();
       Map<Symbol, Object> messageAnnotations = new HashMap<>();
 
@@ -245,7 +245,7 @@ public class JMSTransformationSpeedComparisonTest {
       messageAnnotations.put(Symbol.valueOf("x-opt-jms-msg-type"), 0);
       messageAnnotations.put(Symbol.valueOf("x-opt-jms-dest"), 0);
 
-      Message message = Proton.message();
+      MessageImpl message = (MessageImpl) Proton.message();
 
       // Header Values
       message.setPriority((short) 9);
@@ -272,8 +272,13 @@ public class JMSTransformationSpeedComparisonTest {
       return message;
    }
 
-   private AMQPMessage encode(Message message) {
-      return new AMQPMessage(message);
+   private AMQPMessage encodeAndCreateAMQPMessage(MessageImpl message) {
+      NettyWritable encoded = new NettyWritable(Unpooled.buffer(1024));
+      message.encode(encoded);
+
+      NettyReadable readable = new NettyReadable(encoded.getByteBuf());
+
+      return new AMQPMessage(AMQPMessage.DEFAULT_MESSAGE_FORMAT, readable, null, null);
    }
 
    private void encode(AMQPMessage target) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a851a8f9/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java
index a73d29f..9d171aa 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java
@@ -16,6 +16,11 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.converter.message;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
@@ -23,20 +28,20 @@ import java.util.Map;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
 import org.apache.qpid.proton.Proton;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
 import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
-import org.apache.qpid.proton.message.Message;
-import org.junit.Before;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.message.impl.MessageImpl;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
+import io.netty.buffer.Unpooled;
 
 /**
  * Tests some basic encode / decode functionality on the transformers.
@@ -46,34 +51,32 @@ public class MessageTransformationTest {
    @Rule
    public TestName test = new TestName();
 
-   @Before
-   public void setUp() {
-   }
-
-
    @Test
    public void testBodyOnlyEncodeDecode() throws Exception {
-
-      Message incomingMessage = Proton.message();
+      MessageImpl incomingMessage = (MessageImpl) Proton.message();
 
       incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
 
-      ICoreMessage core = new AMQPMessage(incomingMessage).toCore();
-      Message outboudMessage = AMQPConverter.getInstance().fromCore(core).getProtonMessage();
+      ICoreMessage core = encodeAndCreateAMQPMessage(incomingMessage).toCore();
+      AMQPMessage outboudMessage = AMQPConverter.getInstance().fromCore(core);
 
       assertNull(outboudMessage.getHeader());
+
+      Section body = outboudMessage.getBody();
+      assertNotNull(body);
+      assertTrue(body instanceof AmqpValue);
+      assertTrue(((AmqpValue) body).getValue() instanceof String);
    }
 
    @Test
    public void testPropertiesButNoHeadersEncodeDecode() throws Exception {
-
-      Message incomingMessage = Proton.message();
+      MessageImpl incomingMessage = (MessageImpl) Proton.message();
 
       incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
       incomingMessage.setMessageId("ID:SomeQualifier:0:0:1");
 
-      ICoreMessage core = new AMQPMessage(incomingMessage).toCore();
-      Message outboudMessage = AMQPConverter.getInstance().fromCore(core).getProtonMessage();
+      ICoreMessage core = encodeAndCreateAMQPMessage(incomingMessage).toCore();
+      AMQPMessage outboudMessage = AMQPConverter.getInstance().fromCore(core);
 
       assertNull(outboudMessage.getHeader());
       assertNotNull(outboudMessage.getProperties());
@@ -81,20 +84,24 @@ public class MessageTransformationTest {
 
    @Test
    public void testHeaderButNoPropertiesEncodeDecode() throws Exception {
-
-      Message incomingMessage = Proton.message();
+      MessageImpl incomingMessage = (MessageImpl) Proton.message();
 
       incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
       incomingMessage.setDurable(true);
 
-      ICoreMessage core = new AMQPMessage(incomingMessage).toCore();
-      Message outboudMessage = AMQPConverter.getInstance().fromCore(core).getProtonMessage();
+      ICoreMessage core = encodeAndCreateAMQPMessage(incomingMessage).toCore();
+      AMQPMessage outboudMessage = AMQPConverter.getInstance().fromCore(core);
+
+      assertNotNull(outboudMessage.getHeader());
 
+      Section body = outboudMessage.getBody();
+      assertNotNull(body);
+      assertTrue(body instanceof AmqpValue);
+      assertTrue(((AmqpValue) body).getValue() instanceof String);
    }
 
    @Test
    public void testComplexQpidJMSMessageEncodeDecode() throws Exception {
-
       Map<String, Object> applicationProperties = new HashMap<>();
       Map<Symbol, Object> messageAnnotations = new HashMap<>();
 
@@ -113,7 +120,7 @@ public class MessageTransformationTest {
       messageAnnotations.put(Symbol.valueOf("x-opt-jms-reply-to"), 0);
       messageAnnotations.put(Symbol.valueOf("x-opt-delivery-delay"), 2000);
 
-      Message message = Proton.message();
+      MessageImpl message = (MessageImpl) Proton.message();
 
       // Header Values
       message.setPriority((short) 9);
@@ -137,10 +144,19 @@ public class MessageTransformationTest {
       message.setMessageAnnotations(new MessageAnnotations(messageAnnotations));
       message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
 
-      ICoreMessage core = new AMQPMessage(message).toCore();
-      Message outboudMessage = AMQPConverter.getInstance().fromCore(core).getProtonMessage();
+      ICoreMessage core = encodeAndCreateAMQPMessage(message).toCore();
+      AMQPMessage outboudMessage = AMQPConverter.getInstance().fromCore(core);
 
       assertEquals(10, outboudMessage.getApplicationProperties().getValue().size());
       assertEquals(4, outboudMessage.getMessageAnnotations().getValue().size());
    }
+
+   private AMQPMessage encodeAndCreateAMQPMessage(MessageImpl message) {
+      NettyWritable encoded = new NettyWritable(Unpooled.buffer(1024));
+      message.encode(encoded);
+
+      NettyReadable readable = new NettyReadable(encoded.getByteBuf());
+
+      return new AMQPMessage(AMQPMessage.DEFAULT_MESSAGE_FORMAT, readable, null, null);
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a851a8f9/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java
deleted file mode 100644
index a6a29a0..0000000
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java
+++ /dev/null
@@ -1,438 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.artemis.protocol.amqp.message;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.nio.charset.StandardCharsets;
-import java.util.Date;
-import java.util.HashMap;
-
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
-import org.apache.activemq.artemis.api.core.ICoreMessage;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
-import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessagePersisterV2;
-import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
-import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
-import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
-import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil;
-import org.apache.activemq.artemis.utils.RandomUtil;
-import org.apache.qpid.proton.amqp.UnsignedByte;
-import org.apache.qpid.proton.amqp.UnsignedInteger;
-import org.apache.qpid.proton.amqp.UnsignedLong;
-import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
-import org.apache.qpid.proton.amqp.messaging.Header;
-import org.apache.qpid.proton.amqp.messaging.Properties;
-import org.apache.qpid.proton.codec.EncoderImpl;
-import org.apache.qpid.proton.codec.EncodingCodes;
-import org.apache.qpid.proton.codec.ReadableBuffer;
-import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.message.impl.MessageImpl;
-import org.junit.Assert;
-import org.junit.Test;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-
-public class AMQPMessageTest {
-
-   @Test
-   public void testVerySimple() {
-      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
-      protonMessage.setHeader( new Header());
-      Properties properties = new Properties();
-      properties.setTo("someNiceLocal");
-      protonMessage.setProperties(properties);
-      protonMessage.getHeader().setDeliveryCount(new UnsignedInteger(7));
-      protonMessage.getHeader().setDurable(Boolean.TRUE);
-      protonMessage.setApplicationProperties(new ApplicationProperties(new HashMap<>()));
-
-      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
-
-      assertEquals(7, decoded.getHeader().getDeliveryCount().intValue());
-      assertEquals(true, decoded.getHeader().getDurable());
-      assertEquals("someNiceLocal", decoded.getAddress());
-   }
-
-   @Test
-   public void testApplicationPropertiesReencode() {
-      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
-      protonMessage.setHeader( new Header());
-      Properties properties = new Properties();
-      properties.setTo("someNiceLocal");
-      protonMessage.setProperties(properties);
-      protonMessage.getHeader().setDeliveryCount(new UnsignedInteger(7));
-      protonMessage.getHeader().setDurable(Boolean.TRUE);
-      HashMap<String, Object> map = new HashMap<>();
-      map.put("key", "string1");
-      protonMessage.setApplicationProperties(new ApplicationProperties(map));
-
-      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
-      assertEquals("someNiceLocal", decoded.getAddress());
-
-      decoded.setAddress("newAddress");
-
-      decoded.reencode();
-      assertEquals(7, decoded.getHeader().getDeliveryCount().intValue());
-      assertEquals(true, decoded.getHeader().getDurable());
-      assertEquals("newAddress", decoded.getAddress());
-      assertEquals("string1", decoded.getObjectProperty("key"));
-
-      // validate if the message will be the same after delivery
-      AMQPMessage newDecoded = encodeDelivery(decoded, 3);
-      assertEquals(2, decoded.getHeader().getDeliveryCount().intValue());
-      assertEquals(true, newDecoded.getHeader().getDurable());
-      assertEquals("newAddress", newDecoded.getAddress());
-      assertEquals("string1", newDecoded.getObjectProperty("key"));
-   }
-
-   @Test
-   public void testGetAddressFromMessage() {
-      final String ADDRESS = "myQueue";
-
-      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
-      protonMessage.setHeader(new Header());
-      protonMessage.setAddress(ADDRESS);
-
-      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
-
-      assertEquals(ADDRESS, decoded.getAddress());
-   }
-
-   @Test
-   public void testGetAddressSimpleStringFromMessage() {
-      final String ADDRESS = "myQueue";
-
-      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
-      protonMessage.setHeader(new Header());
-      protonMessage.setAddress(ADDRESS);
-
-      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
-
-      assertEquals(ADDRESS, decoded.getAddressSimpleString().toString());
-   }
-
-   @Test
-   public void testGetAddressFromMessageWithNoValueSet() {
-      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
-
-      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
-
-      assertNull(decoded.getAddress());
-      assertNull(decoded.getAddressSimpleString());
-   }
-
-   @Test
-   public void testIsDurableFromMessage() {
-      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
-      protonMessage.setHeader(new Header());
-      protonMessage.setDurable(true);
-
-      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
-
-      assertTrue(decoded.isDurable());
-   }
-
-   @Test
-   public void testIsDurableFromMessageWithNoValueSet() {
-      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
-
-      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
-
-      assertFalse(decoded.isDurable());
-   }
-
-   @Test
-   public void testGetGroupIDFromMessage() {
-      final String GROUP_ID = "group-1";
-
-      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
-      protonMessage.setHeader(new Header());
-      protonMessage.setGroupId(GROUP_ID);
-
-      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
-
-      assertEquals(GROUP_ID, decoded.getGroupID().toString());
-   }
-
-   @Test
-   public void testGetGroupIDFromMessageWithNoGroupId() {
-      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
-
-      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
-
-      assertNull(decoded.getUserID());
-   }
-
-   @Test
-   public void testGetUserIDFromMessage() {
-      final String USER_NAME = "foo";
-
-      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
-      protonMessage.setHeader(new Header());
-      protonMessage.setUserId(USER_NAME.getBytes(StandardCharsets.UTF_8));
-
-      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
-
-      assertEquals(USER_NAME, decoded.getAMQPUserID());
-   }
-
-   @Test
-   public void testGetUserIDFromMessageWithNoUserID() {
-      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
-
-      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
-
-      assertNull(decoded.getUserID());
-   }
-
-   @Test
-   public void testGetPriorityFromMessage() {
-      final short PRIORITY = 7;
-
-      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
-      protonMessage.setHeader(new Header());
-      protonMessage.setPriority(PRIORITY);
-
-      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
-
-      assertEquals(PRIORITY, decoded.getPriority());
-   }
-
-   @Test
-   public void testGetPriorityFromMessageWithNoPrioritySet() {
-      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
-
-      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
-
-      assertEquals(AMQPMessage.DEFAULT_MESSAGE_PRIORITY, decoded.getPriority());
-   }
-
-   @Test
-   public void testGetTimestampFromMessage() {
-      Date timestamp = new Date(System.currentTimeMillis());
-
-      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
-      protonMessage.setHeader( new Header());
-      Properties properties = new Properties();
-      properties.setCreationTime(timestamp);
-
-      protonMessage.setProperties(properties);
-
-      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
-
-      assertEquals(timestamp.getTime(), decoded.getTimestamp());
-   }
-
-   @Test
-   public void testGetTimestampFromMessageWithNoCreateTimeSet() {
-      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
-      protonMessage.setHeader( new Header());
-
-      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
-
-      assertEquals(0L, decoded.getTimestamp());
-   }
-
-   @Test
-   public void testExtraProperty() {
-      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
-
-      byte[] original = RandomUtil.randomBytes();
-      SimpleString name = SimpleString.toSimpleString("myProperty");
-      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
-      decoded.setAddress("someAddress");
-      decoded.setMessageID(33);
-      decoded.putExtraBytesProperty(name, original);
-
-      ICoreMessage coreMessage = decoded.toCore();
-      Assert.assertEquals(original, coreMessage.getBytesProperty(name));
-
-      ActiveMQBuffer buffer = ActiveMQBuffers.pooledBuffer(10 * 1024);
-      try {
-         decoded.getPersister().encode(buffer, decoded);
-         Assert.assertEquals(AMQPMessagePersisterV2.getInstance().getID(), buffer.readByte()); // the journal reader will read 1 byte to find the persister
-         AMQPMessage readMessage = (AMQPMessage)decoded.getPersister().decode(buffer, null);
-         Assert.assertEquals(33, readMessage.getMessageID());
-         Assert.assertEquals("someAddress", readMessage.getAddress());
-         Assert.assertArrayEquals(original, readMessage.getExtraBytesProperty(name));
-      } finally {
-         buffer.release();
-      }
-
-      {
-         ICoreMessage embeddedMessage = EmbedMessageUtil.embedAsCoreMessage(decoded);
-         AMQPMessage readMessage = (AMQPMessage) EmbedMessageUtil.extractEmbedded(embeddedMessage);
-         Assert.assertEquals(33, readMessage.getMessageID());
-         Assert.assertEquals("someAddress", readMessage.getAddress());
-         Assert.assertArrayEquals(original, readMessage.getExtraBytesProperty(name));
-      }
-   }
-
-   private static final UnsignedLong AMQPVALUE_DESCRIPTOR = UnsignedLong.valueOf(0x0000000000000077L);
-   private static final UnsignedLong APPLICATION_PROPERTIES_DESCRIPTOR = UnsignedLong.valueOf(0x0000000000000074L);
-   private static final UnsignedLong DELIVERY_ANNOTATIONS_DESCRIPTOR = UnsignedLong.valueOf(0x0000000000000071L);
-
-   @Test
-   public void testPartialDecodeIgnoresDeliveryAnnotationsByDefault() {
-      Header header = new Header();
-      header.setDurable(true);
-      header.setPriority(UnsignedByte.valueOf((byte) 6));
-
-      ByteBuf encodedBytes = Unpooled.buffer(1024);
-      NettyWritable writable = new NettyWritable(encodedBytes);
-
-      EncoderImpl encoder = TLSEncode.getEncoder();
-      encoder.setByteBuffer(writable);
-      encoder.writeObject(header);
-
-      // Signal body of AmqpValue but write corrupt underlying type info
-      encodedBytes.writeByte(EncodingCodes.DESCRIBED_TYPE_INDICATOR);
-      encodedBytes.writeByte(EncodingCodes.SMALLULONG);
-      encodedBytes.writeByte(DELIVERY_ANNOTATIONS_DESCRIPTOR.byteValue());
-      encodedBytes.writeByte(EncodingCodes.MAP8);
-      encodedBytes.writeByte(2);  // Size
-      encodedBytes.writeByte(2);  // Elements
-      // Use bad encoding code on underlying type of map key which will fail the decode if run
-      encodedBytes.writeByte(255);
-
-      ReadableBuffer readable = new NettyReadable(encodedBytes);
-
-      AMQPMessage message = null;
-      try {
-         message = new AMQPMessage(0, readable, null, null);
-      } catch (Exception decodeError) {
-         fail("Should not have encountered an exception on partial decode: " + decodeError.getMessage());
-      }
-
-      try {
-         // This should perform the lazy decode of the DeliveryAnnotations portion of the message
-         message.reencode();
-         fail("Should have thrown an error when attempting to decode the ApplicationProperties which are malformed.");
-      } catch (Exception ex) {
-         // Expected decode to fail when building full message.
-      }
-   }
-
-   @Test
-   public void testPartialDecodeIgnoresApplicationPropertiesByDefault() {
-      Header header = new Header();
-      header.setDurable(true);
-      header.setPriority(UnsignedByte.valueOf((byte) 6));
-
-      ByteBuf encodedBytes = Unpooled.buffer(1024);
-      NettyWritable writable = new NettyWritable(encodedBytes);
-
-      EncoderImpl encoder = TLSEncode.getEncoder();
-      encoder.setByteBuffer(writable);
-      encoder.writeObject(header);
-
-      // Signal body of AmqpValue but write corrupt underlying type info
-      encodedBytes.writeByte(EncodingCodes.DESCRIBED_TYPE_INDICATOR);
-      encodedBytes.writeByte(EncodingCodes.SMALLULONG);
-      encodedBytes.writeByte(APPLICATION_PROPERTIES_DESCRIPTOR.byteValue());
-      // Use bad encoding code on underlying type
-      encodedBytes.writeByte(255);
-
-      ReadableBuffer readable = new NettyReadable(encodedBytes);
-
-      AMQPMessage message = null;
-      try {
-         message = new AMQPMessage(0, readable, null, null);
-      } catch (Exception decodeError) {
-         fail("Should not have encountered an exception on partial decode: " + decodeError.getMessage());
-      }
-
-      assertTrue(message.isDurable());
-
-      try {
-         // This should perform the lazy decode of the ApplicationProperties portion of the message
-         message.getStringProperty("test");
-         fail("Should have thrown an error when attempting to decode the ApplicationProperties which are malformed.");
-      } catch (Exception ex) {
-         // Expected decode to fail when building full message.
-      }
-   }
-
-   @Test
-   public void testPartialDecodeIgnoresBodyByDefault() {
-      Header header = new Header();
-      header.setDurable(true);
-      header.setPriority(UnsignedByte.valueOf((byte) 6));
-
-      ByteBuf encodedBytes = Unpooled.buffer(1024);
-      NettyWritable writable = new NettyWritable(encodedBytes);
-
-      EncoderImpl encoder = TLSEncode.getEncoder();
-      encoder.setByteBuffer(writable);
-      encoder.writeObject(header);
-
-      // Signal body of AmqpValue but write corrupt underlying type info
-      encodedBytes.writeByte(EncodingCodes.DESCRIBED_TYPE_INDICATOR);
-      encodedBytes.writeByte(EncodingCodes.SMALLULONG);
-      encodedBytes.writeByte(AMQPVALUE_DESCRIPTOR.byteValue());
-      // Use bad encoding code on underlying type
-      encodedBytes.writeByte(255);
-
-      ReadableBuffer readable = new NettyReadable(encodedBytes);
-
-      AMQPMessage message = null;
-      try {
-         message = new AMQPMessage(0, readable, null, null);
-      } catch (Exception decodeError) {
-         fail("Should not have encountered an exception on partial decode: " + decodeError.getMessage());
-      }
-
-      assertTrue(message.isDurable());
-
-      try {
-         // This will decode the body section if present in order to present it as a Proton Message object
-         message.getProtonMessage();
-         fail("Should have thrown an error when attempting to decode the body which is malformed.");
-      } catch (Exception ex) {
-         // Expected decode to fail when building full message.
-      }
-   }
-
-   private AMQPMessage encodeAndDecodeMessage(MessageImpl message) {
-      ByteBuf nettyBuffer = Unpooled.buffer(1500);
-
-      message.encode(new NettyWritable(nettyBuffer));
-      byte[] bytes = new byte[nettyBuffer.writerIndex()];
-      nettyBuffer.readBytes(bytes);
-
-      return new AMQPMessage(0, bytes, null);
-   }
-
-   private AMQPMessage encodeDelivery(AMQPMessage message, int deliveryCount) {
-      ByteBuf nettyBuffer = Unpooled.buffer(1500);
-
-      message.sendBuffer(nettyBuffer, deliveryCount);
-
-      byte[] bytes = new byte[nettyBuffer.writerIndex()];
-      nettyBuffer.readBytes(bytes);
-
-      return new AMQPMessage(0, bytes, null);
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a851a8f9/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritableTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritableTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritableTest.java
index f0de51a..3e03753 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritableTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritableTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 
 import org.apache.qpid.proton.codec.ReadableBuffer;
 import org.junit.Test;
@@ -125,6 +126,28 @@ public class NettyWritableTest {
       doPutReadableBufferTestImpl(false);
    }
 
+   @Test
+   public void testPutReadableBufferWithOffsetAndNonZeroPosition() {
+      ByteBuf buffer = Unpooled.buffer(1024);
+      NettyWritable writable = new NettyWritable(buffer);
+
+      ByteBuffer source = ByteBuffer.allocate(20);
+      source.put(new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19});
+
+      source.position(5);
+      source.limit(10);
+
+      writable.put(source);
+
+      assertEquals(5, writable.position());
+      assertEquals(5, buffer.readableBytes());
+
+      byte[] check = new byte[5];
+      buffer.readBytes(check);
+
+      assertTrue(Arrays.equals(new byte[] {5, 6, 7, 8, 9}, check));
+   }
+
    private void doPutReadableBufferTestImpl(boolean readOnly) {
       ByteBuffer buf = ByteBuffer.allocate(1024);
       buf.put((byte) 1);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a851a8f9/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java
index b8d9bee..4529efb 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java
@@ -27,7 +27,6 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage;
 import org.apache.activemq.transport.amqp.client.AmqpReceiver;
 import org.apache.activemq.transport.amqp.client.AmqpSender;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
-import org.junit.Assert;
 import org.junit.Test;
 
 public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
@@ -50,7 +49,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
       sender.send(message);
       sender.close();
 
-      assertEquals(1, queueView.getMessageCount());
+      Wait.assertEquals(1, queueView::getMessageCount);
 
       // Now try and get the message
       AmqpReceiver receiver = session.createReceiver(getQueueName());
@@ -101,6 +100,10 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
       server.stop();
       server.start();
 
+      final Queue dlqView = getProxyToQueue(getDeadLetterAddress());
+      assertNotNull(dlqView);
+      Wait.assertEquals(1, dlqView::getMessageCount);
+
       client = createAmqpClient();
       connection = addConnection(client.connect());
       session = connection.createSession();
@@ -108,10 +111,11 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
       AmqpReceiver receiverDLQ = session.createReceiver(getDeadLetterAddress());
       receiverDLQ.flow(1);
       received = receiverDLQ.receive(5, TimeUnit.SECONDS);
-      Assert.assertEquals(1, received.getTimeToLive());
-      System.out.println("received.heandler.TTL" + received.getTimeToLive());
-      Assert.assertNotNull(received);
-      Assert.assertEquals("Value1", received.getApplicationProperty("key1"));
+
+      assertNotNull("Should have read message from DLQ", received);
+      assertEquals(0, received.getTimeToLive());
+      assertNotNull(received);
+      assertEquals("Value1", received.getApplicationProperty("key1"));
 
       connection.close();
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a851a8f9/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MessageJournalTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MessageJournalTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MessageJournalTest.java
index 85ed04f..9516771 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MessageJournalTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MessageJournalTest.java
@@ -29,11 +29,16 @@ import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFa
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
 import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.message.impl.MessageImpl;
 import org.junit.Assert;
 import org.junit.Test;
 
+import io.netty.buffer.Unpooled;
+
 public class MessageJournalTest extends ActiveMQTestBase {
 
    @Test
@@ -78,10 +83,8 @@ public class MessageJournalTest extends ActiveMQTestBase {
       } finally {
          journalStorageManager.getMessageJournal().stop();
       }
-
    }
 
-
    @Test
    public void testStoreAMQP() throws Throwable {
       ActiveMQServer server = createServer(true);
@@ -90,9 +93,9 @@ public class MessageJournalTest extends ActiveMQTestBase {
 
       ProtonProtocolManagerFactory factory = (ProtonProtocolManagerFactory) server.getRemotingService().getProtocolFactoryMap().get("AMQP");
 
-      Message protonJMessage = Message.Factory.create();
+      MessageImpl protonJMessage = (MessageImpl) Message.Factory.create();
 
-      AMQPMessage message = new AMQPMessage(protonJMessage);
+      AMQPMessage message = encodeAndCreateAMQPMessage(protonJMessage);
 
       message.setMessageID(333);
 
@@ -117,14 +120,19 @@ public class MessageJournalTest extends ActiveMQTestBase {
 
       try {
          journalStorageManager.getMessageJournal().start();
-
          journalStorageManager.getMessageJournal().load(committedRecords, preparedTransactions, transactionFailure);
-
          Assert.assertEquals(1, committedRecords.size());
       } finally {
          journalStorageManager.getMessageJournal().stop();
       }
-
    }
 
+   private AMQPMessage encodeAndCreateAMQPMessage(MessageImpl message) {
+      NettyWritable encoded = new NettyWritable(Unpooled.buffer(1024));
+      message.encode(encoded);
+
+      NettyReadable readable = new NettyReadable(encoded.getByteBuf());
+
+      return new AMQPMessage(AMQPMessage.DEFAULT_MESSAGE_FORMAT, readable, null, null);
+   }
 }


[4/5] activemq-artemis git commit: ARTEMIS-2096 Refactor AMQMessage abstraction

Posted by cl...@apache.org.
ARTEMIS-2096 Refactor AMQMessage abstraction

Major refactoring of the AMQPMessage abstraction to resolve
some issue of message corruption still present in the code and
improve the API handling of message changes and re-encoding.

Improves handling of decoding of message sections limiting the
work to only the portions needed and ensuring the state data
is always updated with what has been done.  Fixes issues of
corrupt state on copy of message or other changes in filters.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a851a8f9
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a851a8f9
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a851a8f9

Branch: refs/heads/master
Commit: a851a8f93f30972d252f2bff0bb3d5847cfd7b5f
Parents: 2453978
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Sep 25 12:22:19 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Sep 26 09:19:40 2018 -0400

----------------------------------------------------------------------
 .../protocol/amqp/broker/AMQPMessage.java       | 1602 +++++++------
 .../amqp/broker/AMQPMessagePersister.java       |    3 -
 .../amqp/converter/AMQPMessageSupport.java      |   31 +
 .../amqp/converter/AmqpCoreConverter.java       |  149 +-
 .../protocol/amqp/proton/AmqpSupport.java       |    1 -
 .../protocol/amqp/util/NettyWritable.java       |    6 +-
 .../protocol/amqp/broker/AMQPMessageTest.java   | 2231 ++++++++++++++++++
 .../amqp/converter/TestConversions.java         |   38 +-
 .../JMSMappingInboundTransformerTest.java       |  124 +-
 .../JMSMappingOutboundTransformerTest.java      |   54 +-
 .../JMSTransformationSpeedComparisonTest.java   |   47 +-
 .../message/MessageTransformationTest.java      |   68 +-
 .../protocol/amqp/message/AMQPMessageTest.java  |  438 ----
 .../protocol/amqp/util/NettyWritableTest.java   |   23 +
 .../amqp/AmqpExpiredMessageTest.java            |   16 +-
 .../integration/journal/MessageJournalTest.java |   22 +-
 16 files changed, 3491 insertions(+), 1362 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a851a8f9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index cff5229..821356a 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.protocol.amqp.broker;
 
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -33,9 +34,9 @@ import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.persistence.Persister;
-import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter;
 import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageIdHelper;
 import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
+import org.apache.activemq.artemis.protocol.amqp.converter.AmqpCoreConverter;
 import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
 import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
 import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
@@ -48,12 +49,18 @@ import org.apache.qpid.proton.amqp.UnsignedByte;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.apache.qpid.proton.amqp.UnsignedLong;
 import org.apache.qpid.proton.amqp.UnsignedShort;
+import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.Data;
 import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Footer;
 import org.apache.qpid.proton.amqp.messaging.Header;
 import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
 import org.apache.qpid.proton.amqp.messaging.Properties;
+import org.apache.qpid.proton.amqp.messaging.Section;
 import org.apache.qpid.proton.codec.DecoderImpl;
+import org.apache.qpid.proton.codec.EncoderImpl;
 import org.apache.qpid.proton.codec.ReadableBuffer;
 import org.apache.qpid.proton.codec.TypeConstructor;
 import org.apache.qpid.proton.codec.WritableBuffer;
@@ -69,111 +76,283 @@ public class AMQPMessage extends RefCountMessage {
 
    public static final SimpleString ADDRESS_PROPERTY = SimpleString.toSimpleString("_AMQ_AD");
 
+   public static final int DEFAULT_MESSAGE_FORMAT = 0;
    public static final int DEFAULT_MESSAGE_PRIORITY = 4;
    public static final int MAX_MESSAGE_PRIORITY = 9;
 
-   final long messageFormat;
-   ReadableBuffer data;
-   boolean bufferValid;
-   Boolean durable;
-   long messageID;
-   SimpleString address;
-   MessageImpl protonMessage;
+   private static final int VALUE_NOT_PRESENT = -1;
+
+   // Buffer and state for the data backing this message.
+   private ReadableBuffer data;
+   private boolean messageDataScanned;
+
+   // Marks the message as needed to be re-encoded to update the backing buffer
+   private boolean modified;
+
+   // Track locations of the message sections for later use and track the size
+   // of the header and delivery annotations if present so we can easily exclude
+   // the delivery annotations later and perform efficient encodes or copies.
+   private int headerPosition = VALUE_NOT_PRESENT;
+   private int encodedHeaderSize;
+   private int deliveryAnnotationsPosition = VALUE_NOT_PRESENT;
+   private int encodedDeliveryAnnotationsSize;
+   private int messageAnnotationsPosition = VALUE_NOT_PRESENT;
+   private int propertiesPosition = VALUE_NOT_PRESENT;
+   private int applicationPropertiesPosition = VALUE_NOT_PRESENT;
+   private int remainingBodyPosition = VALUE_NOT_PRESENT;
+
+   // Message level meta data
+   private final long messageFormat;
+   private long messageID;
+   private SimpleString address;
    private volatile int memoryEstimate = -1;
-   private long expiration = 0;
-
-   // Records where the Header section ends if present.
-   private int headerEnds = 0;
-
-   // Records where the message payload starts, ignoring DeliveryAnnotations if present
-   private int messagePaylodStart = 0;
+   private long expiration;
+   private long scheduledTime = -1;
 
-   private boolean parsedHeaders = false;
-   private Header _header;
-   private DeliveryAnnotations _deliveryAnnotations;
-   private MessageAnnotations _messageAnnotations;
-   private Properties _properties;
-   private int deliveryAnnotationsPosition = -1;
-   private int appLocation = -1;
+   // The Proton based AMQP message section that are retained in memory, these are the
+   // mutable portions of the Message as the broker sees it, although AMQP defines that
+   // the Properties and ApplicationProperties are immutable so care should be taken
+   // here when making changes to those Sections.
+   private Header header;
+   private MessageAnnotations messageAnnotations;
+   private Properties properties;
    private ApplicationProperties applicationProperties;
-   private long scheduledTime = -1;
+
    private String connectionID;
    private final CoreMessageObjectPools coreMessageObjectPools;
+   private Set<Object> rejectedConsumers;
 
-   Set<Object> rejectedConsumers;
-
-   /** These are properties set at the broker level..
-    *  these are properties created by the broker only */
+   // These are properties set at the broker level and carried only internally by broker storage.
    private volatile TypedProperties extraProperties;
 
+   /**
+    * Creates a new {@link AMQPMessage} instance from binary encoded message data.
+    *
+    * @param messageFormat
+    *       The Message format tag given the in Transfer that carried this message
+    * @param data
+    *       The encoded AMQP message
+    * @param extraProperties
+    *       Broker specific extra properties that should be carried with this message
+    */
    public AMQPMessage(long messageFormat, byte[] data, TypedProperties extraProperties) {
       this(messageFormat, data, extraProperties, null);
    }
 
+   /**
+    * Creates a new {@link AMQPMessage} instance from binary encoded message data.
+    *
+    * @param messageFormat
+    *       The Message format tag given the in Transfer that carried this message
+    * @param data
+    *       The encoded AMQP message
+    * @param extraProperties
+    *       Broker specific extra properties that should be carried with this message
+    * @param coreMessageObjectPools
+    *       Object pool used to accelerate some String operations.
+    */
    public AMQPMessage(long messageFormat, byte[] data, TypedProperties extraProperties, CoreMessageObjectPools coreMessageObjectPools) {
-      this(messageFormat, ReadableBuffer.ByteBufferReader.wrap(ByteBuffer.wrap(data)), extraProperties, coreMessageObjectPools);
+      this(messageFormat, ReadableBuffer.ByteBufferReader.wrap(data), extraProperties, coreMessageObjectPools);
    }
 
+   /**
+    * Creates a new {@link AMQPMessage} instance from binary encoded message data.
+    *
+    * @param messageFormat
+    *       The Message format tag given the in Transfer that carried this message
+    * @param data
+    *       The encoded AMQP message in an {@link ReadableBuffer} wrapper.
+    * @param extraProperties
+    *       Broker specific extra properties that should be carried with this message
+    * @param coreMessageObjectPools
+    *       Object pool used to accelerate some String operations.
+    */
    public AMQPMessage(long messageFormat, ReadableBuffer data, TypedProperties extraProperties, CoreMessageObjectPools coreMessageObjectPools) {
       this.data = data;
       this.messageFormat = messageFormat;
-      this.bufferValid = true;
       this.coreMessageObjectPools = coreMessageObjectPools;
       this.extraProperties = extraProperties == null ? null : new TypedProperties(extraProperties);
-      parseHeaders();
+      ensureMessageDataScanned();
    }
 
-   /** for persistence reload */
-   public AMQPMessage(long messageFormat) {
+   /**
+    * Internal constructor used for persistence reload of the message.
+    * <p>
+    * The message will not be usable until the persistence mechanism populates the message
+    * data and triggers a parse of the message contents to fill in the message state.
+    *
+    * @param messageFormat
+    *       The Message format tag given the in Transfer that carried this message
+    */
+   AMQPMessage(long messageFormat) {
       this.messageFormat = messageFormat;
-      this.bufferValid = false;
+      this.modified = true;  // No buffer yet so this indicates invalid state.
       this.coreMessageObjectPools = null;
    }
 
-   public AMQPMessage(long messageFormat, Message message) {
-      this.messageFormat = messageFormat;
-      this.protonMessage = (MessageImpl) message;
-      this.bufferValid = false;
-      this.coreMessageObjectPools = null;
+   // Access to the AMQP message data using safe copies freshly decoded from the current
+   // AMQP message data stored in this message wrapper.  Changes to these values cannot
+   // be used to influence the underlying AMQP message data, the standard AMQPMessage API
+   // must be used to make changes to mutable portions of the message.
+
+   /**
+    * Creates and returns a Proton-J MessageImpl wrapper around the message data. Changes to
+    * the returned Message are not reflected in this message.
+    *
+    * @return a MessageImpl that wraps the AMQP message data in this {@link AMQPMessage}
+    */
+   public MessageImpl getProtonMessage() {
+      ensureMessageDataScanned();
+      ensureDataIsValid();
+
+      MessageImpl protonMessage = null;
+      if (data != null) {
+         protonMessage = (MessageImpl) Message.Factory.create();
+         data.rewind();
+         protonMessage.decode(data.duplicate());
+      }
+
+      return protonMessage;
    }
 
-   public AMQPMessage(Message message) {
-      this(0, message);
+   /**
+    * Returns a copy of the message Header if one is present, changes to the returned
+    * Header instance do not affect the original Message.
+    *
+    * @return a copy of the Message Header if one exists or null if none present.
+    */
+   public Header getHeader() {
+      ensureMessageDataScanned();
+      ensureDataIsValid();
+      return scanForMessageSection(headerPosition, Header.class);
    }
 
-   public MessageImpl getProtonMessage() {
-      if (protonMessage == null) {
-         protonMessage = (MessageImpl) Message.Factory.create();
+   /**
+    * Returns a copy of the MessageAnnotations in the message if present or null.  Changes to the
+    * returned DeliveryAnnotations instance do not affect the original Message.
+    *
+    * @return a copy of the {@link DeliveryAnnotations} present in the message or null if non present.
+    */
+   public DeliveryAnnotations getDeliveryAnnotations() {
+      ensureMessageDataScanned();
+      ensureDataIsValid();
+      return scanForMessageSection(deliveryAnnotationsPosition, DeliveryAnnotations.class);
+   }
 
-         if (data != null) {
-            data.rewind();
-            protonMessage.decode(data.duplicate());
-            this._header = protonMessage.getHeader();
-            protonMessage.setHeader(null);
+   /**
+    * Returns a copy of the DeliveryAnnotations in the message if present or null.  Changes to the
+    * returned MessageAnnotations instance do not affect the original Message.
+    *
+    * @return a copy of the {@link MessageAnnotations} present in the message or null if non present.
+    */
+   public MessageAnnotations getMessageAnnotations() {
+      ensureMessageDataScanned();
+      ensureDataIsValid();
+      return scanForMessageSection(messageAnnotationsPosition, MessageAnnotations.class);
+   }
+
+   /**
+    * Returns a copy of the message Properties if one is present, changes to the returned
+    * Properties instance do not affect the original Message.
+    *
+    * @return a copy of the Message Properties if one exists or null if none present.
+    */
+   public Properties getProperties() {
+      ensureMessageDataScanned();
+      ensureDataIsValid();
+      return scanForMessageSection(propertiesPosition, Properties.class);
+   }
+
+   /**
+    * Returns a copy of the {@link ApplicationProperties} present in the message if present or null.
+    * Changes to the returned MessageAnnotations instance do not affect the original Message.
+    *
+    * @return a copy of the {@link ApplicationProperties} present in the message or null if non present.
+    */
+   public ApplicationProperties getApplicationProperties() {
+      ensureMessageDataScanned();
+      ensureDataIsValid();
+      return scanForMessageSection(applicationPropertiesPosition, ApplicationProperties.class);
+   }
+
+   /**
+    * Retrieves the AMQP Section that composes the body of this message by decoding a
+    * fresh copy from the encoded message data.  Changes to the returned value are not
+    * reflected in the value encoded in the original message.
+    *
+    * @return the Section that makes up the body of this message.
+    */
+   public Section getBody() {
+      ensureMessageDataScanned();
+      ensureDataIsValid();
+
+      // We only handle Sections of AmqpSequence, AmqpValue and Data types so we filter on those.
+      // There could also be a Footer and no body so this will prevent a faulty return type in case
+      // of no body or message type we don't handle.
+      return scanForMessageSection(Math.max(0, remainingBodyPosition), AmqpSequence.class, AmqpValue.class, Data.class);
+   }
+
+   /**
+    * Retrieves the AMQP Footer encoded in the data of this message by decoding a
+    * fresh copy from the encoded message data.  Changes to the returned value are not
+    * reflected in the value encoded in the original message.
+    *
+    * @return the Footer that was encoded into this AMQP Message.
+    */
+   public Footer getFooter() {
+      ensureMessageDataScanned();
+      ensureDataIsValid();
+      return scanForMessageSection(Math.max(0, remainingBodyPosition), Footer.class);
+   }
+
+   @SuppressWarnings({ "unchecked", "rawtypes" })
+   private <T> T scanForMessageSection(int scanStartPosition, Class...targetTypes) {
+      ensureMessageDataScanned();
+
+      // In cases where we parsed out enough to know the value is not encoded in the message
+      // we can exit without doing any reads or buffer hopping.
+      if (scanStartPosition == VALUE_NOT_PRESENT) {
+         return null;
+      }
+
+      ReadableBuffer buffer = data.duplicate().position(0);
+      final DecoderImpl decoder = TLSEncode.getDecoder();
+
+      buffer.position(scanStartPosition);
+
+      T section = null;
+
+      decoder.setBuffer(buffer);
+      try {
+         while (buffer.hasRemaining()) {
+            TypeConstructor<?> constructor = decoder.readConstructor();
+            for (Class<?> type : targetTypes) {
+               if (type.equals(constructor.getTypeClass())) {
+                  section = (T) constructor.readValue();
+                  return section;
+               }
+            }
+
+            constructor.skipValue();
          }
+      } finally {
+         decoder.setBuffer(null);
       }
 
-      return protonMessage;
+      return section;
    }
 
-   private void initalizeObjects() {
-      if (protonMessage == null) {
-         if (data == null) {
-            headerEnds = 0;
-            messagePaylodStart = 0;
-            _header = new Header();
-            _deliveryAnnotations = new DeliveryAnnotations(new HashMap<>());
-            _properties = new Properties();
-            applicationProperties = new ApplicationProperties(new HashMap<>());
-            protonMessage = (MessageImpl) Message.Factory.create();
-            protonMessage.setApplicationProperties(applicationProperties);
-            protonMessage.setDeliveryAnnotations(_deliveryAnnotations);
-         }
+   private ApplicationProperties lazyDecodeApplicationProperties() {
+      if (applicationProperties == null && applicationPropertiesPosition != VALUE_NOT_PRESENT) {
+         applicationProperties = scanForMessageSection(applicationPropertiesPosition, ApplicationProperties.class);
       }
+
+      return applicationProperties;
    }
 
-   private Map<String, Object> getApplicationPropertiesMap() {
-      ApplicationProperties appMap = getApplicationProperties();
+   @SuppressWarnings("unchecked")
+   private Map<String, Object> getApplicationPropertiesMap(boolean createIfAbsent) {
+      ApplicationProperties appMap = lazyDecodeApplicationProperties();
       Map<String, Object> map = null;
 
       if (appMap != null) {
@@ -181,190 +360,359 @@ public class AMQPMessage extends RefCountMessage {
       }
 
       if (map == null) {
-         map = new HashMap<>();
-         this.applicationProperties = new ApplicationProperties(map);
+         if (createIfAbsent) {
+            map = new HashMap<>();
+            this.applicationProperties = new ApplicationProperties(map);
+         } else {
+            map = Collections.EMPTY_MAP;
+         }
       }
 
       return map;
    }
 
-   private ApplicationProperties getApplicationProperties() {
-      parseHeaders();
+   @SuppressWarnings("unchecked")
+   private Map<Symbol, Object> getMessageAnnotationsMap(boolean createIfAbsent) {
+      Map<Symbol, Object> map = null;
 
-      if (applicationProperties == null && appLocation >= 0) {
-         ReadableBuffer buffer = data.duplicate();
-         buffer.position(appLocation);
-         TLSEncode.getDecoder().setBuffer(buffer);
-         Object section = TLSEncode.getDecoder().readObject();
-         applicationProperties = (ApplicationProperties) section;
-         appLocation = -1;
-         TLSEncode.getDecoder().setBuffer(null);
+      if (messageAnnotations != null) {
+         map = messageAnnotations.getValue();
       }
 
-      return applicationProperties;
+      if (map == null) {
+         if (createIfAbsent) {
+            map = new HashMap<>();
+            this.messageAnnotations = new MessageAnnotations(map);
+         } else {
+            map = Collections.EMPTY_MAP;
+         }
+      }
+
+      return map;
    }
 
-   private DeliveryAnnotations getDeliveryAnnotations() {
-      parseHeaders();
+   private Object getMessageAnnotation(String annotation) {
+      return getMessageAnnotation(Symbol.getSymbol(annotation));
+   }
 
-      if (_deliveryAnnotations == null && deliveryAnnotationsPosition >= 0) {
-         ReadableBuffer buffer = data.duplicate();
-         buffer.position(deliveryAnnotationsPosition);
-         TLSEncode.getDecoder().setBuffer(buffer);
-         Object section = TLSEncode.getDecoder().readObject();
-         _deliveryAnnotations = (DeliveryAnnotations) section;
-         deliveryAnnotationsPosition = -1;
-         TLSEncode.getDecoder().setBuffer(null);
-      }
+   private Object getMessageAnnotation(Symbol annotation) {
+      return getMessageAnnotationsMap(false).get(annotation);
+   }
 
-      return _deliveryAnnotations;
+   private Object removeMessageAnnotation(Symbol annotation) {
+      return getMessageAnnotationsMap(false).remove(annotation);
    }
 
-   private synchronized void parseHeaders() {
-      if (!parsedHeaders) {
-         if (data == null) {
-            initalizeObjects();
-         } else {
-            partialDecode(data);
+   private void setMessageAnnotation(String annotation, Object value) {
+      setMessageAnnotation(Symbol.getSymbol(annotation), value);
+   }
+
+   private void setMessageAnnotation(Symbol annotation, Object value) {
+      getMessageAnnotationsMap(true).put(annotation, value);
+   }
+
+   // Message decoding and copying methods.  Care must be taken here to ensure the buffer and the
+   // state tracking information is kept up to data.  When the message is manually changed a forced
+   // re-encode should be done to update the backing data with the in memory elements.
+
+   private synchronized void ensureMessageDataScanned() {
+      if (!messageDataScanned) {
+         scanMessageData();
+         messageDataScanned = true;
+      }
+   }
+
+   private synchronized void scanMessageData() {
+      DecoderImpl decoder = TLSEncode.getDecoder();
+      decoder.setBuffer(data.rewind());
+
+      header = null;
+      messageAnnotations = null;
+      properties = null;
+      applicationProperties = null;
+      expiration = 0;
+      encodedHeaderSize = 0;
+      memoryEstimate = -1;
+      scheduledTime = -1;
+      encodedDeliveryAnnotationsSize = 0;
+      headerPosition = VALUE_NOT_PRESENT;
+      deliveryAnnotationsPosition = VALUE_NOT_PRESENT;
+      propertiesPosition = VALUE_NOT_PRESENT;
+      applicationPropertiesPosition = VALUE_NOT_PRESENT;
+      remainingBodyPosition = VALUE_NOT_PRESENT;
+
+      try {
+         while (data.hasRemaining()) {
+            int constructorPos = data.position();
+            TypeConstructor<?> constructor = decoder.readConstructor();
+            if (Header.class.equals(constructor.getTypeClass())) {
+               header = (Header) constructor.readValue();
+               headerPosition = constructorPos;
+               encodedHeaderSize = data.position();
+               if (header.getTtl() != null) {
+                  expiration = System.currentTimeMillis() + header.getTtl().intValue();
+               }
+            } else if (DeliveryAnnotations.class.equals(constructor.getTypeClass())) {
+               // Don't decode these as they are not used by the broker at all and are
+               // discarded on send, mark for lazy decode if ever needed.
+               constructor.skipValue();
+               deliveryAnnotationsPosition = constructorPos;
+               encodedDeliveryAnnotationsSize = data.position() - constructorPos;
+            } else if (MessageAnnotations.class.equals(constructor.getTypeClass())) {
+               messageAnnotationsPosition = constructorPos;
+               messageAnnotations = (MessageAnnotations) constructor.readValue();
+            } else if (Properties.class.equals(constructor.getTypeClass())) {
+               propertiesPosition = constructorPos;
+               properties = (Properties) constructor.readValue();
+
+               if (properties.getAbsoluteExpiryTime() != null && properties.getAbsoluteExpiryTime().getTime() > 0) {
+                  expiration = properties.getAbsoluteExpiryTime().getTime();
+               }
+            } else if (ApplicationProperties.class.equals(constructor.getTypeClass())) {
+               // Lazy decoding will start at the TypeConstructor of these ApplicationProperties
+               // but we scan past it to grab the location of the possible body and footer section.
+               applicationPropertiesPosition = constructorPos;
+               constructor.skipValue();
+               remainingBodyPosition = data.hasRemaining() ? data.position() : VALUE_NOT_PRESENT;
+               break;
+            } else {
+               // This will be either the body or a Footer section which will be treated as an immutable
+               // and be copied as is when re-encoding the message.
+               remainingBodyPosition = constructorPos;
+               break;
+            }
          }
-         parsedHeaders = true;
+      } finally {
+         decoder.setByteBuffer(null);
+         data.rewind();
       }
    }
 
    @Override
-   public org.apache.activemq.artemis.api.core.Message setConnectionID(String connectionID) {
-      this.connectionID = connectionID;
-      return this;
+   public org.apache.activemq.artemis.api.core.Message copy() {
+      ensureDataIsValid();
+
+      ReadableBuffer view = data.duplicate().rewind();
+      byte[] newData = new byte[view.remaining()];
+
+      // Copy the full message contents with delivery annotations as they will
+      // be trimmed on send and may become useful on the broker at a later time.
+      data.get(newData);
+
+      AMQPMessage newEncode = new AMQPMessage(this.messageFormat, newData, extraProperties, coreMessageObjectPools);
+      newEncode.setMessageID(this.getMessageID());
+      return newEncode;
    }
 
    @Override
-   public String getConnectionID() {
-      return connectionID;
+   public org.apache.activemq.artemis.api.core.Message copy(long newID) {
+      return copy().setMessageID(newID);
    }
 
-   public MessageAnnotations getMessageAnnotations() {
-      parseHeaders();
-      return _messageAnnotations;
-   }
+   // Core Message APIs for persisting and encoding of message data along with
+   // utilities for checking memory usage and encoded size characteristics.
 
-   public Header getHeader() {
-      parseHeaders();
-      return _header;
+   /**
+    * Would be called by the Artemis Core components to encode the message into
+    * the provided send buffer.  Because of how Proton message data handling works
+    * this method is not currently used by the AMQP protocol head and will not be
+    * called for out-bound sends.
+    *
+    * @see #getSendBuffer(int) for the actual method used for message sends.
+    */
+   @Override
+   public void sendBuffer(ByteBuf buffer, int deliveryCount) {
+      ensureDataIsValid();
+      NettyWritable writable = new NettyWritable(buffer);
+      writable.put(getSendBuffer(deliveryCount));
    }
 
-   public Properties getProperties() {
-      parseHeaders();
-      return _properties;
-   }
+   /**
+    * Gets a ByteBuf from the Message that contains the encoded bytes to be sent on the wire.
+    * <p>
+    * When possible this method will present the bytes to the caller without copying them into
+    * a new buffer copy.  If copying is needed a new Netty buffer is created and returned. The
+    * caller should ensure that the reference count on the returned buffer is always decremented
+    * to avoid a leak in the case of a copied buffer being returned.
+    *
+    * @param deliveryCount
+    *       The new delivery count for this message.
+    *
+    * @return a Netty ByteBuf containing the encoded bytes of this Message instance.
+    */
+   public ReadableBuffer getSendBuffer(int deliveryCount) {
+      ensureDataIsValid();
 
-   private Object getSymbol(String symbol) {
-      return getSymbol(Symbol.getSymbol(symbol));
+      if (deliveryCount > 1) {
+         return createCopyWithNewDeliveryCount(deliveryCount);
+      } else if (deliveryAnnotationsPosition != VALUE_NOT_PRESENT) {
+         return createCopyWithoutDeliveryAnnotations();
+      } else {
+         // Common case message has no delivery annotations and this is the first delivery
+         // so no re-encoding or section skipping needed.
+         return data.duplicate();
+      }
    }
 
-   private Object getSymbol(Symbol symbol) {
-      MessageAnnotations annotations = getMessageAnnotations();
-      Map<Symbol, Object> mapAnnotations = annotations != null ? annotations.getValue() : null;
-      if (mapAnnotations != null) {
-         return mapAnnotations.get(symbol);
-      }
+   private ReadableBuffer createCopyWithoutDeliveryAnnotations() {
+      assert deliveryAnnotationsPosition != VALUE_NOT_PRESENT;
 
-      return null;
-   }
+      // The original message had delivery annotations and so we must copy into a new
+      // buffer skipping the delivery annotations section as that is not meant to survive
+      // beyond this hop.
+      ReadableBuffer duplicate = data.duplicate();
 
-   private Object removeSymbol(Symbol symbol) {
-      MessageAnnotations annotations = getMessageAnnotations();
-      Map<Symbol, Object> mapAnnotations = annotations != null ? annotations.getValue() : null;
-      if (mapAnnotations != null) {
-         return mapAnnotations.remove(symbol);
-      }
+      final ByteBuf result = PooledByteBufAllocator.DEFAULT.heapBuffer(getEncodeSize());
+      result.writeBytes(duplicate.limit(encodedHeaderSize).byteBuffer());
+      duplicate.clear();
+      duplicate.position(encodedHeaderSize + encodedDeliveryAnnotationsSize);
+      result.writeBytes(duplicate.byteBuffer());
 
-      return null;
+      return new NettyReadable(result);
    }
 
-   private void setSymbol(String symbol, Object value) {
-      setSymbol(Symbol.getSymbol(symbol), value);
-   }
+   private ReadableBuffer createCopyWithNewDeliveryCount(int deliveryCount) {
+      assert deliveryCount > 1;
 
-   private void setSymbol(Symbol symbol, Object value) {
-      MessageAnnotations annotations = getMessageAnnotations();
-      if (annotations == null) {
-         _messageAnnotations = new MessageAnnotations(new HashMap<>());
-         annotations = _messageAnnotations;
-      }
-      Map<Symbol, Object> mapAnnotations = annotations != null ? annotations.getValue() : null;
-      if (mapAnnotations != null) {
-         mapAnnotations.put(symbol, value);
+      final int amqpDeliveryCount = deliveryCount - 1;
+
+      final ByteBuf result = PooledByteBufAllocator.DEFAULT.heapBuffer(getEncodeSize());
+
+      // If this is re-delivering the message then the header must be re-encoded
+      // otherwise we want to write the original header if present.  When a
+      // Header is present we need to copy it as we are updating the re-delivered
+      // message and not the stored version which we don't want to invalidate here.
+      Header header = this.header;
+      if (header == null) {
+         header = new Header();
+      } else {
+         header = new Header(header);
       }
+
+      header.setDeliveryCount(UnsignedInteger.valueOf(amqpDeliveryCount));
+      TLSEncode.getEncoder().setByteBuffer(new NettyWritable(result));
+      TLSEncode.getEncoder().writeObject(header);
+      TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
+
+      // This will skip any existing delivery annotations that might have been present
+      // in the original message.
+      data.position(encodedHeaderSize + encodedDeliveryAnnotationsSize);
+      result.writeBytes(data.byteBuffer());
+      data.position(0);
+
+      return new NettyReadable(result);
    }
 
    @Override
-   public RoutingType getRoutingType() {
-      Object routingType = getSymbol(AMQPMessageSupport.ROUTING_TYPE);
+   public void messageChanged() {
+      modified = true;
+   }
 
-      if (routingType != null) {
-         return RoutingType.getType((byte) routingType);
+   @Override
+   public ByteBuf getBuffer() {
+      if (data == null) {
+         return null;
       } else {
-         routingType = getSymbol(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION);
-         if (routingType != null) {
-            if (AMQPMessageSupport.QUEUE_TYPE == (byte) routingType || AMQPMessageSupport.TEMP_QUEUE_TYPE == (byte) routingType) {
-               return RoutingType.ANYCAST;
-            } else if (AMQPMessageSupport.TOPIC_TYPE == (byte) routingType || AMQPMessageSupport.TEMP_TOPIC_TYPE == (byte) routingType) {
-               return RoutingType.MULTICAST;
-            }
+         if (data instanceof NettyReadable) {
+            return ((NettyReadable) data).getByteBuf();
          } else {
-            return null;
+            return Unpooled.wrappedBuffer(data.byteBuffer());
          }
+      }
+   }
 
-         return null;
+   @Override
+   public AMQPMessage setBuffer(ByteBuf buffer) {
+      // If this is ever called we would be in a highly unfortunate state
+      this.data = null;
+      return this;
+   }
+
+   @Override
+   public int getEncodeSize() {
+      ensureDataIsValid();
+      // The encoded size will exclude any delivery annotations that are present as we will clip them.
+      return data.remaining() - encodedDeliveryAnnotationsSize;
+   }
+
+   @Override
+   public void receiveBuffer(ByteBuf buffer) {
+      // Not used for AMQP messages.
+   }
+
+   @Override
+   public int getMemoryEstimate() {
+      if (memoryEstimate == -1) {
+         memoryEstimate = memoryOffset + (data != null ? data.capacity() : 0);
       }
+
+      return memoryEstimate;
    }
 
    @Override
-   public org.apache.activemq.artemis.api.core.Message setRoutingType(RoutingType routingType) {
-      parseHeaders();
-      if (routingType == null) {
-         removeSymbol(AMQPMessageSupport.ROUTING_TYPE);
-      } else {
-         setSymbol(AMQPMessageSupport.ROUTING_TYPE, routingType.getType());
+   public ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) {
+      try {
+         return AmqpCoreConverter.toCore(
+            this, coreMessageObjectPools, header, messageAnnotations, properties, lazyDecodeApplicationProperties(), getBody(), getFooter());
+      } catch (Exception e) {
+         throw new RuntimeException(e.getMessage(), e);
       }
-      return this;
    }
 
    @Override
-   public SimpleString getGroupID() {
-      parseHeaders();
+   public ICoreMessage toCore() {
+      return toCore(coreMessageObjectPools);
+   }
 
-      if (_properties != null && _properties.getGroupId() != null) {
-         return SimpleString.toSimpleString(_properties.getGroupId(), coreMessageObjectPools == null ? null : coreMessageObjectPools.getGroupIdStringSimpleStringPool());
+   @Override
+   public void persist(ActiveMQBuffer targetRecord) {
+      ensureDataIsValid();
+      targetRecord.writeInt(internalPersistSize());
+      if (data.hasArray()) {
+         targetRecord.writeBytes(data.array(), data.arrayOffset(), data.remaining());
       } else {
-         return null;
+         targetRecord.writeBytes(data.byteBuffer());
       }
    }
 
    @Override
-   public Long getScheduledDeliveryTime() {
+   public int getPersistSize() {
+      ensureDataIsValid();
+      return DataConstants.SIZE_INT + internalPersistSize();
+   }
 
-      if (scheduledTime < 0) {
-         Object objscheduledTime = getSymbol("x-opt-delivery-time");
-         Object objdelay = getSymbol("x-opt-delivery-delay");
+   private int internalPersistSize() {
+      return data.remaining();
+   }
 
-         if (objscheduledTime != null && objscheduledTime instanceof Number) {
-            this.scheduledTime = ((Number) objscheduledTime).longValue();
-         } else if (objdelay != null && objdelay instanceof Number) {
-            this.scheduledTime = System.currentTimeMillis() + ((Number) objdelay).longValue();
-         } else {
-            this.scheduledTime = 0;
-         }
+   @Override
+   public void reloadPersistence(ActiveMQBuffer record) {
+      int size = record.readInt();
+      byte[] recordArray = new byte[size];
+      record.readBytes(recordArray);
+      data = ReadableBuffer.ByteBufferReader.wrap(ByteBuffer.wrap(recordArray));
+
+      // Message state is now that the underlying buffer is loaded but the contents
+      // not yet scanned, once done the message is fully populated and ready for dispatch.
+      // Force a scan now and tidy the state variables to reflect where we are following
+      // this reload from the store.
+      scanMessageData();
+      messageDataScanned = true;
+      modified = false;
+
+      // Message state should reflect that is came from persistent storage which
+      // can happen when moved to a durable location.  We must re-encode here to
+      // avoid a subsequent redelivery from suddenly appearing with a durable header
+      // tag when the initial delivery did not.
+      if (!isDurable()) {
+         setDurable(true);
+         reencode();
       }
-
-      return scheduledTime;
    }
 
    @Override
-   public AMQPMessage setScheduledDeliveryTime(Long time) {
-      parseHeaders();
-      setSymbol(AMQPMessageSupport.JMS_DELIVERY_TIME, time);
-      return this;
+   public long getPersistentSize() throws ActiveMQException {
+      return getEncodeSize();
    }
 
    @Override
@@ -373,136 +721,143 @@ public class AMQPMessage extends RefCountMessage {
    }
 
    @Override
-   public synchronized boolean acceptsConsumer(long consumer) {
-      if (rejectedConsumers == null) {
-         return true;
-      } else {
-         return !rejectedConsumers.contains(consumer);
-      }
-   }
+   public void reencode() {
+      ensureMessageDataScanned();
 
-   @Override
-   public synchronized void rejectConsumer(long consumer) {
-      if (rejectedConsumers == null) {
-         rejectedConsumers = new HashSet<>();
+      // The address was updated on a message with Properties so we update them
+      // for cases where there are no properties we aren't adding a properties
+      // section which seems wrong but this preserves previous behavior.
+      if (properties != null && address != null) {
+         properties.setTo(address.toString());
       }
 
-      rejectedConsumers.add(consumer);
+      encodeMessage();
+      scanMessageData();
+
+      messageDataScanned = true;
+      modified = false;
    }
 
-   private synchronized void partialDecode(ReadableBuffer buffer) {
-      DecoderImpl decoder = TLSEncode.getDecoder();
-      decoder.setBuffer(buffer.rewind());
+   private synchronized void ensureDataIsValid() {
+      assert data != null;
 
-      _header = null;
-      expiration = 0;
-      headerEnds = 0;
-      messagePaylodStart = 0;
-      _deliveryAnnotations = null;
-      _messageAnnotations = null;
-      _properties = null;
-      applicationProperties = null;
-      appLocation = -1;
-      deliveryAnnotationsPosition = -1;
+      if (modified) {
+         encodeMessage();
+         modified = false;
+      }
+   }
+
+   private synchronized void encodeMessage() {
+      int estimated = Math.max(1500, data != null ? data.capacity() + 1000 : 0);
+      ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(estimated);
+      EncoderImpl encoder = TLSEncode.getEncoder();
 
       try {
-         while (buffer.hasRemaining()) {
-            int constructorPos = buffer.position();
-            TypeConstructor<?> constructor = decoder.readConstructor();
-            if (Header.class.equals(constructor.getTypeClass())) {
-               _header = (Header) constructor.readValue();
-               headerEnds = messagePaylodStart = buffer.position();
-               durable = _header.getDurable();
-               if (_header.getTtl() != null) {
-                  expiration = System.currentTimeMillis() + _header.getTtl().intValue();
-               }
-            } else if (DeliveryAnnotations.class.equals(constructor.getTypeClass())) {
-               // Don't decode these as they are not used by the broker at all and are
-               // discarded on send, mark for lazy decode if ever needed.
-               constructor.skipValue();
-               deliveryAnnotationsPosition = constructorPos;
-               messagePaylodStart = buffer.position();
-            } else if (MessageAnnotations.class.equals(constructor.getTypeClass())) {
-               _messageAnnotations = (MessageAnnotations) constructor.readValue();
-            } else if (Properties.class.equals(constructor.getTypeClass())) {
-               _properties = (Properties) constructor.readValue();
+         NettyWritable writable = new NettyWritable(buffer);
 
-               if (_properties.getAbsoluteExpiryTime() != null && _properties.getAbsoluteExpiryTime().getTime() > 0) {
-                  expiration = _properties.getAbsoluteExpiryTime().getTime();
-               }
+         encoder.setByteBuffer(writable);
+         if (header != null) {
+            encoder.writeObject(header);
+         }
 
-               // Next is either Application Properties or the rest of the message, leave it for
-               // lazy decode of the ApplicationProperties should there be any.  Check first though
-               // as we don't want to actually decode the body which could be expensive.
-               if (buffer.hasRemaining()) {
-                  constructor = decoder.peekConstructor();
-                  if (ApplicationProperties.class.equals(constructor.getTypeClass())) {
-                     appLocation = buffer.position();
-                  }
-               }
-               break;
-            } else if (ApplicationProperties.class.equals(constructor.getTypeClass())) {
-               // Lazy decoding will start at the TypeConstructor of these ApplicationProperties
-               appLocation = constructorPos;
-               break;
-            } else {
-               break;
+         // We currently do not encode any delivery annotations but it is conceivable
+         // that at some point they may need to be preserved, this is where that needs
+         // to happen.
+
+         if (messageAnnotations != null) {
+            encoder.writeObject(messageAnnotations);
+         }
+         if (properties != null) {
+            encoder.writeObject(properties);
+         }
+
+         // Whenever possible avoid encoding sections we don't need to by
+         // checking if application properties where loaded or added and
+         // encoding only in that case.
+         if (applicationProperties != null) {
+            encoder.writeObject(applicationProperties);
+
+            // Now raw write the remainder body and footer if present.
+            if (remainingBodyPosition != VALUE_NOT_PRESENT) {
+               writable.put(data.position(remainingBodyPosition));
             }
+         } else if (applicationPropertiesPosition != VALUE_NOT_PRESENT) {
+            // Writes out ApplicationProperties, Body and Footer in one go if present.
+            writable.put(data.position(applicationPropertiesPosition));
+         } else if (remainingBodyPosition != VALUE_NOT_PRESENT) {
+            // No Application properties at all so raw write Body and Footer sections
+            writable.put(data.position(remainingBodyPosition));
          }
+
+         byte[] bytes = new byte[buffer.writerIndex()];
+
+         buffer.readBytes(bytes);
+         data = ReadableBuffer.ByteBufferReader.wrap(ByteBuffer.wrap(bytes));
       } finally {
-         decoder.setByteBuffer(null);
-         buffer.position(0);
+         encoder.setByteBuffer((WritableBuffer) null);
+         buffer.release();
+      }
+   }
+
+   // These methods interact with the Extra Properties that can accompany the message but
+   // because these are not sent on the wire, update to these does not force a re-encode on
+   // send of the message.
+
+   public TypedProperties createExtraProperties() {
+      if (extraProperties == null) {
+         extraProperties = new TypedProperties();
       }
+      return extraProperties;
    }
 
-   public long getMessageFormat() {
-      return messageFormat;
+   public TypedProperties getExtraProperties() {
+      return extraProperties;
+   }
+
+   public AMQPMessage setExtraProperties(TypedProperties extraProperties) {
+      this.extraProperties = extraProperties;
+      return this;
    }
 
    @Override
-   public void messageChanged() {
-      bufferValid = false;
-      this.data = null;
+   public org.apache.activemq.artemis.api.core.Message putExtraBytesProperty(SimpleString key, byte[] value) {
+      createExtraProperties().putBytesProperty(key, value);
+      return this;
    }
 
    @Override
-   public ByteBuf getBuffer() {
-      if (data == null) {
+   public byte[] getExtraBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+      if (extraProperties == null) {
          return null;
       } else {
-         return Unpooled.wrappedBuffer(data.byteBuffer());
+         return extraProperties.getBytesProperty(key);
       }
    }
 
    @Override
-   public AMQPMessage setBuffer(ByteBuf buffer) {
-      this.data = null;
-      return this;
+   public byte[] removeExtraBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+      if (extraProperties == null) {
+         return null;
+      } else {
+         return (byte[])extraProperties.removeProperty(key);
+      }
    }
 
-   @Override
-   public org.apache.activemq.artemis.api.core.Message copy() {
-      checkBuffer();
-
-      ReadableBuffer view = data.duplicate();
+   // Message meta data access for Core and AMQP usage.
 
-      byte[] newData = new byte[view.remaining() - (messagePaylodStart - headerEnds)];
-
-      view.position(0).limit(headerEnds);
-      view.get(newData, 0, headerEnds);
-      view.clear();
-      view.position(messagePaylodStart);
-      view.get(newData, headerEnds, view.remaining());
-
-      AMQPMessage newEncode = new AMQPMessage(this.messageFormat, newData, extraProperties, coreMessageObjectPools);
-      newEncode.setDurable(isDurable()).setMessageID(this.getMessageID());
-      return newEncode;
+   @Override
+   public org.apache.activemq.artemis.api.core.Message setConnectionID(String connectionID) {
+      this.connectionID = connectionID;
+      return this;
    }
 
    @Override
-   public org.apache.activemq.artemis.api.core.Message copy(long newID) {
-      checkBuffer();
-      return copy().setMessageID(newID);
+   public String getConnectionID() {
+      return connectionID;
+   }
+
+   public long getMessageFormat() {
+      return messageFormat;
    }
 
    @Override
@@ -523,23 +878,31 @@ public class AMQPMessage extends RefCountMessage {
 
    @Override
    public AMQPMessage setExpiration(long expiration) {
-
-      Properties properties = getProperties();
-
       if (properties != null) {
          if (expiration <= 0) {
             properties.setAbsoluteExpiryTime(null);
          } else {
             properties.setAbsoluteExpiryTime(new Date(expiration));
          }
+      } else if (expiration > 0) {
+         properties = new Properties();
+         properties.setAbsoluteExpiryTime(new Date(expiration));
+      }
+
+      // We are overriding expiration with an Absolute expiration time so any
+      // previous Header based TTL also needs to be removed.
+      if (header != null) {
+         header.setTtl(null);
       }
-      this.expiration = expiration;
+
+      this.expiration = Math.max(0, expiration);
+
       return this;
    }
 
    @Override
    public Object getUserID() {
-      Properties properties = getProperties();
+      // User ID in Artemis API means Message ID
       if (properties != null && properties.getMessageId() != null) {
          return properties.getMessageId();
       } else {
@@ -548,14 +911,14 @@ public class AMQPMessage extends RefCountMessage {
    }
 
    /**
-    * Before we added AMQP into Artemis / Hornetq, the name getUserID was already taken by JMSMessageID.
+    * Before we added AMQP into Artemis the name getUserID was already taken by JMSMessageID.
     * We cannot simply change the names now as it would break the API for existing clients.
     *
     * This is to return and read the proper AMQP userID.
-    * @return
+    *
+    * @return the UserID value in the AMQP Properties if one is present.
     */
    public Object getAMQPUserID() {
-      Properties properties = getProperties();
       if (properties != null && properties.getUserId() != null) {
          Binary binary = properties.getUserId();
          return new String(binary.getArray(), binary.getArrayOffset(), binary.getLength(), StandardCharsets.UTF_8);
@@ -570,29 +933,27 @@ public class AMQPMessage extends RefCountMessage {
    }
 
    @Override
-   public boolean isDurable() {
-      if (durable != null) {
-         return durable;
-      }
-
-      parseHeaders();
-
-      if (getHeader() != null && getHeader().getDurable() != null) {
-         durable = getHeader().getDurable();
-         return durable;
-      } else {
-         return durable != null ? durable : false;
-      }
+   public Object getDuplicateProperty() {
+      return null;
    }
 
    @Override
-   public Object getDuplicateProperty() {
-      return null;
+   public boolean isDurable() {
+      if (header != null && header.getDurable() != null) {
+         return header.getDurable();
+      } else {
+         return false;
+      }
    }
 
    @Override
    public org.apache.activemq.artemis.api.core.Message setDurable(boolean durable) {
-      this.durable = durable;
+      if (header == null) {
+         header = new Header();
+      }
+
+      header.setDurable(durable);  // Message needs to be re-encoded following this action.
+
       return this;
    }
 
@@ -602,11 +963,6 @@ public class AMQPMessage extends RefCountMessage {
       return addressSimpleString == null ? null : addressSimpleString.toString();
    }
 
-
-   public SimpleString cachedAddressSimpleString(String address) {
-      return CoreMessageObjectPools.cachedAddressSimpleString(address, coreMessageObjectPools);
-   }
-
    @Override
    public AMQPMessage setAddress(String address) {
       setAddress(cachedAddressSimpleString(address));
@@ -632,7 +988,6 @@ public class AMQPMessage extends RefCountMessage {
 
          if (address == null) {
             // if it still null, it will look for the address on the getTo();
-            Properties properties = getProperties();
             if (properties != null && properties.getTo() != null) {
                address = cachedAddressSimpleString(properties.getTo());
             }
@@ -641,10 +996,14 @@ public class AMQPMessage extends RefCountMessage {
       return address;
    }
 
+   private SimpleString cachedAddressSimpleString(String address) {
+      return CoreMessageObjectPools.cachedAddressSimpleString(address, coreMessageObjectPools);
+   }
+
    @Override
    public long getTimestamp() {
-      if (getProperties() != null && getProperties().getCreationTime() != null) {
-         return getProperties().getCreationTime().getTime();
+      if (properties != null && properties.getCreationTime() != null) {
+         return properties.getCreationTime().getTime();
       } else {
          return 0L;
       }
@@ -652,14 +1011,17 @@ public class AMQPMessage extends RefCountMessage {
 
    @Override
    public org.apache.activemq.artemis.api.core.Message setTimestamp(long timestamp) {
-      getProperties().setCreationTime(new Date(timestamp));
+      if (properties == null) {
+         properties = new Properties();
+      }
+      properties.setCreationTime(new Date(timestamp));
       return this;
    }
 
    @Override
    public byte getPriority() {
-      if (getHeader() != null && getHeader().getPriority() != null) {
-         return (byte) Math.min(getHeader().getPriority().intValue(), MAX_MESSAGE_PRIORITY);
+      if (header != null && header.getPriority() != null) {
+         return (byte) Math.min(header.getPriority().intValue(), MAX_MESSAGE_PRIORITY);
       } else {
          return DEFAULT_MESSAGE_PRIORITY;
       }
@@ -667,349 +1029,180 @@ public class AMQPMessage extends RefCountMessage {
 
    @Override
    public org.apache.activemq.artemis.api.core.Message setPriority(byte priority) {
-      getHeader().setPriority(UnsignedByte.valueOf(priority));
-      return this;
-   }
-
-   @Override
-   public void receiveBuffer(ByteBuf buffer) {
-
-   }
-
-   private synchronized void checkBuffer() {
-      if (!bufferValid) {
-         encodeProtonMessage();
-      }
-   }
-
-   private void encodeProtonMessage() {
-      int estimated = Math.max(1500, data != null ? data.capacity() + 1000 : 0);
-      ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(estimated);
-      try {
-         getProtonMessage().encode(new NettyWritable(buffer));
-         byte[] bytes = new byte[buffer.writerIndex()];
-         buffer.readBytes(bytes);
-         data = ReadableBuffer.ByteBufferReader.wrap(ByteBuffer.wrap(bytes));
-         bufferValid = true;
-      } finally {
-         buffer.release();
-      }
-   }
-
-   @Override
-   public int getEncodeSize() {
-      checkBuffer();
-      // + 20checkBuffer is an estimate for the Header with the deliveryCount
-      return data.remaining() - messagePaylodStart + 20;
-   }
-
-   @Override
-   public void sendBuffer(ByteBuf buffer, int deliveryCount) {
-      checkBuffer();
-
-      int amqpDeliveryCount = deliveryCount - 1;
-
-      // If the re-delivering the message then the header must be re-encoded
-      // otherwise we want to write the original header if present.
-      if (amqpDeliveryCount > 0) {
-
-         Header header = getHeader();
-         if (header == null) {
-            header = new Header();
-            header.setDurable(durable);
-         }
-
-         synchronized (header) {
-            header.setDeliveryCount(UnsignedInteger.valueOf(amqpDeliveryCount));
-            TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buffer));
-            TLSEncode.getEncoder().writeObject(header);
-            TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
-         }
-      } else if (headerEnds > 0) {
-         buffer.writeBytes(data.duplicate().limit(headerEnds).byteBuffer());
-      }
-
-      data.position(messagePaylodStart);
-      buffer.writeBytes(data.byteBuffer());
-      data.position(0);
-   }
-
-   /**
-    * Gets a ByteBuf from the Message that contains the encoded bytes to be sent on the wire.
-    * <p>
-    * When possible this method will present the bytes to the caller without copying them into
-    * another buffer copy.  If copying is needed a new Netty buffer is created and returned. The
-    * caller should ensure that the reference count on the returned buffer is always decremented
-    * to avoid a leak in the case of a copied buffer being returned.
-    *
-    * @param deliveryCount
-    *       The new delivery count for this message.
-    *
-    * @return a Netty ByteBuf containing the encoded bytes of this Message instance.
-    */
-   public ReadableBuffer getSendBuffer(int deliveryCount) {
-      checkBuffer();
-
-      if (deliveryCount > 1) {
-         return createCopyWithNewDeliveryCount(deliveryCount);
-      } else if (headerEnds != messagePaylodStart) {
-         return createCopyWithoutDeliveryAnnotations();
-      } else {
-         // Common case message has no delivery annotations and this is the first delivery
-         // so no re-encoding or section skipping needed.
-         return data.duplicate();
-      }
-   }
-
-   private ReadableBuffer createCopyWithoutDeliveryAnnotations() {
-      assert headerEnds != messagePaylodStart;
-
-      // The original message had delivery annotations and so we must copy into a new
-      // buffer skipping the delivery annotations section as that is not meant to survive
-      // beyond this hop.
-      ReadableBuffer duplicate = data.duplicate();
-
-      final ByteBuf result = PooledByteBufAllocator.DEFAULT.heapBuffer(getEncodeSize());
-      result.writeBytes(duplicate.limit(headerEnds).byteBuffer());
-      duplicate.clear();
-      duplicate.position(messagePaylodStart);
-      result.writeBytes(duplicate.byteBuffer());
-
-      return new NettyReadable(result);
-   }
-
-   private ReadableBuffer createCopyWithNewDeliveryCount(int deliveryCount) {
-      assert deliveryCount > 1;
-
-      final int amqpDeliveryCount = deliveryCount - 1;
-      // If the re-delivering the message then the header must be re-encoded
-      // (or created if not previously present).  Any delivery annotations should
-      // be skipped as well in the resulting buffer.
-
-      final ByteBuf result = PooledByteBufAllocator.DEFAULT.heapBuffer(getEncodeSize());
-
-      Header header = getHeader();
       if (header == null) {
          header = new Header();
-         header.setDurable(durable);
-      }
-
-      synchronized (header) {
-         // Updates or adds a Header section with the correct delivery count
-         header.setDeliveryCount(UnsignedInteger.valueOf(amqpDeliveryCount));
-         TLSEncode.getEncoder().setByteBuffer(new NettyWritable(result));
-         TLSEncode.getEncoder().writeObject(header);
-         TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
-      }
-
-      // This will skip any existing delivery annotations that might have been present
-      // in the original message.
-      data.position(messagePaylodStart);
-      result.writeBytes(data.byteBuffer());
-      data.position(0);
-
-      return new NettyReadable(result);
-   }
-
-   public TypedProperties createExtraProperties() {
-      if (extraProperties == null) {
-         extraProperties = new TypedProperties();
       }
-      return extraProperties;
-   }
-
-   public TypedProperties getExtraProperties() {
-      return extraProperties;
-   }
-
-   public AMQPMessage setExtraProperties(TypedProperties extraProperties) {
-      this.extraProperties = extraProperties;
-      return this;
-   }
-
-   @Override
-   public org.apache.activemq.artemis.api.core.Message putExtraBytesProperty(SimpleString key, byte[] value) {
-      createExtraProperties().putBytesProperty(key, value);
+      header.setPriority(UnsignedByte.valueOf(priority));
       return this;
    }
 
    @Override
-   public byte[] getExtraBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException {
-      if (extraProperties == null) {
-         return null;
+   public SimpleString getReplyTo() {
+      if (properties != null) {
+         return SimpleString.toSimpleString(properties.getReplyTo());
       } else {
-         return extraProperties.getBytesProperty(key);
-      }
-   }
-
-   @Override
-   public byte[] removeExtraBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException {
-      if (extraProperties == null) {
          return null;
-      } else {
-         return (byte[])extraProperties.removeProperty(key);
       }
    }
 
    @Override
-   public org.apache.activemq.artemis.api.core.Message putBooleanProperty(String key, boolean value) {
-      getApplicationPropertiesMap().put(key, Boolean.valueOf(value));
-      return this;
-   }
-
-   @Override
-   public org.apache.activemq.artemis.api.core.Message putByteProperty(String key, byte value) {
-      getApplicationPropertiesMap().put(key, Byte.valueOf(value));
-      return this;
-   }
-
-   @Override
-   public org.apache.activemq.artemis.api.core.Message putBytesProperty(String key, byte[] value) {
-      getApplicationPropertiesMap().put(key, value);
-      return this;
-   }
-
-   @Override
-   public org.apache.activemq.artemis.api.core.Message putShortProperty(String key, short value) {
-      getApplicationPropertiesMap().put(key, Short.valueOf(value));
-      return this;
-   }
+   public AMQPMessage setReplyTo(SimpleString address) {
+      if (properties == null) {
+         properties = new Properties();
+      }
 
-   @Override
-   public org.apache.activemq.artemis.api.core.Message putCharProperty(String key, char value) {
-      getApplicationPropertiesMap().put(key, Character.valueOf(value));
+      properties.setReplyTo(address != null ? address.toString() : null);
       return this;
    }
 
    @Override
-   public org.apache.activemq.artemis.api.core.Message putIntProperty(String key, int value) {
-      getApplicationPropertiesMap().put(key, Integer.valueOf(value));
-      return this;
-   }
+   public RoutingType getRoutingType() {
+      Object routingType = getMessageAnnotation(AMQPMessageSupport.ROUTING_TYPE);
 
-   @Override
-   public org.apache.activemq.artemis.api.core.Message putLongProperty(String key, long value) {
-      getApplicationPropertiesMap().put(key, Long.valueOf(value));
-      return this;
-   }
+      if (routingType != null) {
+         return RoutingType.getType((byte) routingType);
+      } else {
+         routingType = getMessageAnnotation(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION);
+         if (routingType != null) {
+            if (AMQPMessageSupport.QUEUE_TYPE == (byte) routingType || AMQPMessageSupport.TEMP_QUEUE_TYPE == (byte) routingType) {
+               return RoutingType.ANYCAST;
+            } else if (AMQPMessageSupport.TOPIC_TYPE == (byte) routingType || AMQPMessageSupport.TEMP_TOPIC_TYPE == (byte) routingType) {
+               return RoutingType.MULTICAST;
+            }
+         } else {
+            return null;
+         }
 
-   @Override
-   public org.apache.activemq.artemis.api.core.Message putFloatProperty(String key, float value) {
-      getApplicationPropertiesMap().put(key, Float.valueOf(value));
-      return this;
+         return null;
+      }
    }
 
    @Override
-   public org.apache.activemq.artemis.api.core.Message putDoubleProperty(String key, double value) {
-      getApplicationPropertiesMap().put(key, Double.valueOf(value));
+   public org.apache.activemq.artemis.api.core.Message setRoutingType(RoutingType routingType) {
+      if (routingType == null) {
+         removeMessageAnnotation(AMQPMessageSupport.ROUTING_TYPE);
+      } else {
+         setMessageAnnotation(AMQPMessageSupport.ROUTING_TYPE, routingType.getType());
+      }
       return this;
    }
 
    @Override
-   public org.apache.activemq.artemis.api.core.Message putBooleanProperty(SimpleString key, boolean value) {
-      getApplicationPropertiesMap().put(key.toString(), Boolean.valueOf(value));
-      return this;
-   }
+   public SimpleString getGroupID() {
+      ensureMessageDataScanned();
 
-   @Override
-   public org.apache.activemq.artemis.api.core.Message putByteProperty(SimpleString key, byte value) {
-      return putByteProperty(key.toString(), value);
+      if (properties != null && properties.getGroupId() != null) {
+         return SimpleString.toSimpleString(properties.getGroupId(),
+            coreMessageObjectPools == null ? null : coreMessageObjectPools.getGroupIdStringSimpleStringPool());
+      } else {
+         return null;
+      }
    }
 
    @Override
-   public org.apache.activemq.artemis.api.core.Message putBytesProperty(SimpleString key, byte[] value) {
-      return putBytesProperty(key.toString(), value);
-   }
+   public Long getScheduledDeliveryTime() {
+      if (scheduledTime < 0) {
+         Object objscheduledTime = getMessageAnnotation(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME);
+         Object objdelay = getMessageAnnotation(AMQPMessageSupport.SCHEDULED_DELIVERY_DELAY);
 
-   @Override
-   public org.apache.activemq.artemis.api.core.Message putShortProperty(SimpleString key, short value) {
-      return putShortProperty(key.toString(), value);
-   }
+         if (objscheduledTime != null && objscheduledTime instanceof Number) {
+            this.scheduledTime = ((Number) objscheduledTime).longValue();
+         } else if (objdelay != null && objdelay instanceof Number) {
+            this.scheduledTime = System.currentTimeMillis() + ((Number) objdelay).longValue();
+         } else {
+            this.scheduledTime = 0;
+         }
+      }
 
-   @Override
-   public org.apache.activemq.artemis.api.core.Message putCharProperty(SimpleString key, char value) {
-      return putCharProperty(key.toString(), value);
+      return scheduledTime;
    }
 
    @Override
-   public org.apache.activemq.artemis.api.core.Message putIntProperty(SimpleString key, int value) {
-      return putIntProperty(key.toString(), value);
-   }
+   public AMQPMessage setScheduledDeliveryTime(Long time) {
+      if (time != null && time.longValue() > 0) {
+         setMessageAnnotation(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME, time);
+         removeMessageAnnotation(AMQPMessageSupport.SCHEDULED_DELIVERY_DELAY);
+         scheduledTime = time;
+      } else {
+         removeMessageAnnotation(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME);
+         removeMessageAnnotation(AMQPMessageSupport.SCHEDULED_DELIVERY_DELAY);
+         scheduledTime = 0;
+      }
 
-   @Override
-   public org.apache.activemq.artemis.api.core.Message putLongProperty(SimpleString key, long value) {
-      return putLongProperty(key.toString(), value);
+      return this;
    }
 
    @Override
-   public org.apache.activemq.artemis.api.core.Message putFloatProperty(SimpleString key, float value) {
-      return putFloatProperty(key.toString(), value);
+   public Object removeAnnotation(SimpleString key) {
+      return removeMessageAnnotation(Symbol.getSymbol(key.toString()));
    }
 
    @Override
-   public org.apache.activemq.artemis.api.core.Message putDoubleProperty(SimpleString key, double value) {
-      return putDoubleProperty(key.toString(), value);
+   public Object getAnnotation(SimpleString key) {
+      return getMessageAnnotation(key.toString());
    }
 
    @Override
-   public org.apache.activemq.artemis.api.core.Message putStringProperty(String key, String value) {
-      getApplicationPropertiesMap().put(key, value);
+   public AMQPMessage setAnnotation(SimpleString key, Object value) {
+      setMessageAnnotation(key.toString(), value);
       return this;
    }
 
+   // JMS Style property access methods.  These can result in additional decode of AMQP message
+   // data from Application properties.  Updates to application properties puts the message in a
+   // dirty state and requires a re-encode of the data to update all buffer state data otherwise
+   // the next send of the Message will not include changes made here.
+
    @Override
-   public org.apache.activemq.artemis.api.core.Message putObjectProperty(String key,
-                                                                         Object value) throws ActiveMQPropertyConversionException {
-      getApplicationPropertiesMap().put(key, value);
-      return this;
+   public Object removeProperty(SimpleString key) {
+      return removeProperty(key.toString());
    }
 
    @Override
-   public org.apache.activemq.artemis.api.core.Message putObjectProperty(SimpleString key,
-                                                                         Object value) throws ActiveMQPropertyConversionException {
-      return putObjectProperty(key.toString(), value);
+   public Object removeProperty(String key) {
+      return getApplicationPropertiesMap(false).remove(key);
    }
 
    @Override
-   public Object removeProperty(String key) {
-      return getApplicationPropertiesMap().remove(key);
+   public boolean containsProperty(SimpleString key) {
+      return containsProperty(key.toString());
    }
 
    @Override
    public boolean containsProperty(String key) {
-      return getApplicationPropertiesMap().containsKey(key);
+      return getApplicationPropertiesMap(false).containsKey(key);
    }
 
    @Override
    public Boolean getBooleanProperty(String key) throws ActiveMQPropertyConversionException {
-      return (Boolean) getApplicationPropertiesMap().get(key);
+      return (Boolean) getApplicationPropertiesMap(false).get(key);
    }
 
    @Override
    public Byte getByteProperty(String key) throws ActiveMQPropertyConversionException {
-      return (Byte) getApplicationPropertiesMap().get(key);
+      return (Byte) getApplicationPropertiesMap(false).get(key);
    }
 
    @Override
    public Double getDoubleProperty(String key) throws ActiveMQPropertyConversionException {
-      return (Double) getApplicationPropertiesMap().get(key);
+      return (Double) getApplicationPropertiesMap(false).get(key);
    }
 
    @Override
    public Integer getIntProperty(String key) throws ActiveMQPropertyConversionException {
-      return (Integer) getApplicationPropertiesMap().get(key);
+      return (Integer) getApplicationPropertiesMap(false).get(key);
    }
 
    @Override
    public Long getLongProperty(String key) throws ActiveMQPropertyConversionException {
-      return (Long) getApplicationPropertiesMap().get(key);
+      return (Long) getApplicationPropertiesMap(false).get(key);
    }
 
    @Override
    public Object getObjectProperty(String key) {
       if (key.equals(MessageUtil.TYPE_HEADER_NAME.toString())) {
-         if (getProperties() != null) {
-            return getProperties().getSubject();
+         if (properties != null) {
+            return properties.getSubject();
          }
       } else if (key.equals(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString())) {
          return getConnectionID();
@@ -1018,11 +1211,11 @@ public class AMQPMessage extends RefCountMessage {
       } else if (key.equals(MessageUtil.JMSXUSERID)) {
          return getAMQPUserID();
       } else if (key.equals(MessageUtil.CORRELATIONID_HEADER_NAME.toString())) {
-         if (getProperties() != null && getProperties().getCorrelationId() != null) {
-            return AMQPMessageIdHelper.INSTANCE.toCorrelationIdString(getProperties().getCorrelationId());
+         if (properties != null && properties.getCorrelationId() != null) {
+            return AMQPMessageIdHelper.INSTANCE.toCorrelationIdString(properties.getCorrelationId());
          }
       } else {
-         Object value = getApplicationPropertiesMap().get(key);
+         Object value = getApplicationPropertiesMap(false).get(key);
          if (value instanceof UnsignedInteger ||
              value instanceof UnsignedByte ||
              value instanceof UnsignedLong ||
@@ -1038,78 +1231,32 @@ public class AMQPMessage extends RefCountMessage {
 
    @Override
    public Short getShortProperty(String key) throws ActiveMQPropertyConversionException {
-      return (Short) getApplicationPropertiesMap().get(key);
+      return (Short) getApplicationPropertiesMap(false).get(key);
    }
 
    @Override
    public Float getFloatProperty(String key) throws ActiveMQPropertyConversionException {
-      return (Float) getApplicationPropertiesMap().get(key);
+      return (Float) getApplicationPropertiesMap(false).get(key);
    }
 
    @Override
    public String getStringProperty(String key) throws ActiveMQPropertyConversionException {
       if (key.equals(MessageUtil.TYPE_HEADER_NAME.toString())) {
-         return getProperties().getSubject();
+         return properties.getSubject();
       } else if (key.equals(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString())) {
          return getConnectionID();
       } else {
-         return (String) getApplicationPropertiesMap().get(key);
+         return (String) getApplicationPropertiesMap(false).get(key);
       }
    }
 
    @Override
-   public Object removeAnnotation(SimpleString key) {
-      return removeSymbol(Symbol.getSymbol(key.toString()));
-   }
-
-   @Override
-   public Object getAnnotation(SimpleString key) {
-      return getSymbol(key.toString());
-   }
-
-   @Override
-   public AMQPMessage setAnnotation(SimpleString key, Object value) {
-      setSymbol(key.toString(), value);
-      return this;
-   }
-
-   @Override
-   public void reencode() {
-      parseHeaders();
-      getApplicationProperties();
-      getDeliveryAnnotations();
-      if (_header != null) getProtonMessage().setHeader(_header);
-      if (_deliveryAnnotations != null) getProtonMessage().setDeliveryAnnotations(_deliveryAnnotations);
-      if (_messageAnnotations != null) getProtonMessage().setMessageAnnotations(_messageAnnotations);
-      if (applicationProperties != null) getProtonMessage().setApplicationProperties(applicationProperties);
-      if (_properties != null) {
-         if (address != null) {
-            _properties.setTo(address.toString());
-         }
-         getProtonMessage().setProperties(this._properties);
+   public Set<SimpleString> getPropertyNames() {
+      HashSet<SimpleString> values = new HashSet<>();
+      for (Object k : getApplicationPropertiesMap(false).keySet()) {
+         values.add(SimpleString.toSimpleString(k.toString(), getPropertyKeysPool()));
       }
-      bufferValid = false;
-      checkBuffer();
-   }
-
-   @Override
-   public SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException {
-      return SimpleString.toSimpleString((String) getApplicationPropertiesMap().get(key), getPropertyValuesPool());
-   }
-
-   @Override
-   public byte[] getBytesProperty(String key) throws ActiveMQPropertyConversionException {
-      return (byte[]) getApplicationPropertiesMap().get(key);
-   }
-
-   @Override
-   public Object removeProperty(SimpleString key) {
-      return removeProperty(key.toString());
-   }
-
-   @Override
-   public boolean containsProperty(SimpleString key) {
-      return containsProperty(key.toString());
+      return values;
    }
 
    @Override
@@ -1123,6 +1270,11 @@ public class AMQPMessage extends RefCountMessage {
    }
 
    @Override
+   public byte[] getBytesProperty(String key) throws ActiveMQPropertyConversionException {
+      return (byte[]) getApplicationPropertiesMap(false).get(key);
+   }
+
+   @Override
    public Double getDoubleProperty(SimpleString key) throws ActiveMQPropertyConversionException {
       return getDoubleProperty(key.toString());
    }
@@ -1166,107 +1318,150 @@ public class AMQPMessage extends RefCountMessage {
    public byte[] getBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException {
       return getBytesProperty(key.toString());
    }
+   @Override
+   public SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException {
+      return SimpleString.toSimpleString((String) getApplicationPropertiesMap(false).get(key), getPropertyValuesPool());
+   }
+
+   // Core Message Application Property update methods, calling these puts the message in a dirty
+   // state and requires a re-encode of the data to update all buffer state data.  If no re-encode
+   // is done prior to the next dispatch the old view of the message will be sent.
 
    @Override
-   public org.apache.activemq.artemis.api.core.Message putStringProperty(SimpleString key, SimpleString value) {
-      return putStringProperty(key.toString(), value.toString());
+   public org.apache.activemq.artemis.api.core.Message putBooleanProperty(String key, boolean value) {
+      getApplicationPropertiesMap(true).put(key, Boolean.valueOf(value));
+      return this;
    }
 
    @Override
-   public org.apache.activemq.artemis.api.core.Message putStringProperty(SimpleString key, String value) {
-      return putStringProperty(key.toString(), value);
+   public org.apache.activemq.artemis.api.core.Message putByteProperty(String key, byte value) {
+      getApplicationPropertiesMap(true).put(key, Byte.valueOf(value));
+      return this;
    }
 
    @Override
-   public Set<SimpleString> getPropertyNames() {
-      HashSet<SimpleString> values = new HashSet<>();
-      for (Object k : getApplicationPropertiesMap().keySet()) {
-         values.add(SimpleString.toSimpleString(k.toString(), getPropertyKeysPool()));
-      }
-      return values;
+   public org.apache.activemq.artemis.api.core.Message putBytesProperty(String key, byte[] value) {
+      getApplicationPropertiesMap(true).put(key, value);
+      return this;
    }
 
    @Override
-   public int getMemoryEstimate() {
-      if (memoryEstimate == -1) {
-         memoryEstimate = memoryOffset + (data != null ? data.capacity() : 0);
-      }
+   public org.apache.activemq.artemis.api.core.Message putShortProperty(String key, short value) {
+      getApplicationPropertiesMap(true).put(key, Short.valueOf(value));
+      return this;
+   }
 
-      return memoryEstimate;
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putCharProperty(String key, char value) {
+      getApplicationPropertiesMap(true).put(key, Character.valueOf(value));
+      return this;
    }
 
    @Override
-   public ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) {
-      try {
-         return AMQPConverter.getInstance().toCore(this, coreMessageObjectPools);
-      } catch (Exception e) {
-         throw new RuntimeException(e.getMessage(), e);
-      }
+   public org.apache.activemq.artemis.api.core.Message putIntProperty(String key, int value) {
+      getApplicationPropertiesMap(true).put(key, Integer.valueOf(value));
+      return this;
    }
 
    @Override
-   public ICoreMessage toCore() {
-      return toCore(null);
+   public org.apache.activemq.artemis.api.core.Message putLongProperty(String key, long value) {
+      getApplicationPropertiesMap(true).put(key, Long.valueOf(value));
+      return this;
    }
 
    @Override
-   public SimpleString getLastValueProperty() {
-      return getSimpleStringProperty(HDR_LAST_VALUE_NAME);
+   public org.apache.activemq.artemis.api.core.Message putFloatProperty(String key, float value) {
+      getApplicationPropertiesMap(true).put(key, Float.valueOf(value));
+      return this;
    }
 
    @Override
-   public org.apache.activemq.artemis.api.core.Message setLastValueProperty(SimpleString lastValueName) {
-      return putStringProperty(HDR_LAST_VALUE_NAME, lastValueName);
+   public org.apache.activemq.artemis.api.core.Message putDoubleProperty(String key, double value) {
+      getApplicationPropertiesMap(true).put(key, Double.valueOf(value));
+      return this;
    }
 
    @Override
-   public SimpleString getReplyTo() {
-      if (getProperties() != null) {
-         return SimpleString.toSimpleString(getProperties().getReplyTo());
-      } else {
-         return null;
-      }
+   public org.apache.activemq.artemis.api.core.Message putBooleanProperty(SimpleString key, boolean value) {
+      getApplicationPropertiesMap(true).put(key.toString(), Boolean.valueOf(value));
+      return this;
    }
 
    @Override
-   public AMQPMessage setReplyTo(SimpleString address) {
-      if (getProperties() != null) {
-         getProperties().setReplyTo(address != null ? address.toString() : null);
-      }
+   public org.apache.activemq.artemis.api.core.Message putByteProperty(SimpleString key, byte value) {
+      return putByteProperty(key.toString(), value);
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putBytesProperty(SimpleString key, byte[] value) {
+      return putBytesProperty(key.toString(), value);
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putShortProperty(SimpleString key, short value) {
+      return putShortProperty(key.toString(), value);
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putCharProperty(SimpleString key, char value) {
+      return putCharProperty(key.toString(), value);
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putIntProperty(SimpleString key, int value) {
+      return putIntProperty(key.toString(), value);
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putLongProperty(SimpleString key, long value) {
+      return putLongProperty(key.toString(), value);
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putFloatProperty(SimpleString key, float value) {
+      return putFloatProperty(key.toString(), value);
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putDoubleProperty(SimpleString key, double value) {
+      return putDoubleProperty(key.toString(), value);
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putStringProperty(String key, String value) {
+      getApplicationPropertiesMap(true).put(key, value);
       return this;
    }
 
    @Override
-   public int getPersistSize() {
-      checkBuffer();
-      return DataConstants.SIZE_INT + internalPersistSize();
+   public org.apache.activemq.artemis.api.core.Message putObjectProperty(String key, Object value) throws ActiveMQPropertyConversionException {
+      getApplicationPropertiesMap(true).put(key, value);
+      return this;
    }
 
-   private int internalPersistSize() {
-      return data.remaining();
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putObjectProperty(SimpleString key, Object value) throws ActiveMQPropertyConversionException {
+      return putObjectProperty(key.toString(), value);
    }
 
    @Override
-   public void persist(ActiveMQBuffer targetRecord) {
-      checkBuffer();
-      targetRecord.writeInt(internalPersistSize());
-      if (data.hasArray()) {
-         targetRecord.writeBytes(data.array(), data.arrayOffset(), data.remaining());
-      } else {
-         targetRecord.writeBytes(data.byteBuffer());
-      }
+   public org.apache.activemq.artemis.api.core.Message putStringProperty(SimpleString key, SimpleString value) {
+      return putStringProperty(key.toString(), value.toString());
    }
 
    @Override
-   public void reloadPersistence(ActiveMQBuffer record) {
-      int size = record.readInt();
-      byte[] recordArray = new byte[size];
-      record.readBytes(recordArray);
-      this.messagePaylodStart = 0; // whatever was persisted will be sent
-      this.data = ReadableBuffer.ByteBufferReader.wrap(ByteBuffer.wrap(recordArray));
-      this.bufferValid = true;
-      this.durable = true; // it's coming from the journal, so it's durable
-      parseHeaders();
+   public org.apache.activemq.artemis.api.core.Message putStringProperty(SimpleString key, String value) {
+      return putStringProperty(key.toString(), value);
+   }
+
+   @Override
+   public SimpleString getLastValueProperty() {
+      return getSimpleStringProperty(HDR_LAST_VALUE_NAME);
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message setLastValueProperty(SimpleString lastValueName) {
+      return putStringProperty(HDR_LAST_VALUE_NAME, lastValueName);
    }
 
    @Override
@@ -1275,12 +1470,30 @@ public class AMQPMessage extends RefCountMessage {
          ", messageID=" + getMessageID() +
          ", address=" + getAddress() +
          ", size=" + getEncodeSize() +
-         ", applicationProperties=" + getApplicationProperties() +
-         ", properties=" + getProperties() +
+         ", applicationProperties=" + applicationProperties +
+         ", properties=" + properties +
          ", extraProperties = " + getExtraProperties() +
          "]";
    }
 
+   @Override
+   public synchronized boolean acceptsConsumer(long consumer) {
+      if (rejectedConsumers == null) {
+         return true;
+      } else {
+         return !rejectedConsumers.contains(consumer);
+      }
+   }
+
+   @Override
+   public synchronized void rejectConsumer(long consumer) {
+      if (rejectedConsumers == null) {
+         rejectedConsumers = new HashSet<>();
+      }
+
+      rejectedConsumers.add(consumer);
+   }
+
    private SimpleString.StringSimpleStringPool getPropertyKeysPool() {
       return coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesStringSimpleStringPools().getPropertyKeysPool();
    }
@@ -1288,9 +1501,4 @@ public class AMQPMessage extends RefCountMessage {
    private SimpleString.StringSimpleStringPool getPropertyValuesPool() {
       return coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesStringSimpleStringPools().getPropertyValuesPool();
    }
-
-   @Override
-   public long getPersistentSize() throws ActiveMQException {
-      return getEncodeSize();
-   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a851a8f9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java
index bec0beb..c688124 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java
@@ -50,7 +50,6 @@ public class AMQPMessagePersister extends MessagePersister {
          SimpleString.sizeofNullableString(record.getAddressSimpleString()) + DataConstants.SIZE_LONG + DataConstants.SIZE_LONG;
    }
 
-
    /** Sub classes must add the first short as the protocol-id */
    @Override
    public void encode(ActiveMQBuffer buffer, Message record) {
@@ -62,7 +61,6 @@ public class AMQPMessagePersister extends MessagePersister {
       record.persist(buffer);
    }
 
-
    @Override
    public Message decode(ActiveMQBuffer buffer, Message record) {
       long id = buffer.readLong();
@@ -76,5 +74,4 @@ public class AMQPMessagePersister extends MessagePersister {
       }
       return record;
    }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a851a8f9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
index 7c4f425..fc31fc2 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
@@ -63,12 +63,25 @@ public final class AMQPMessageSupport {
 
    /**
     * Attribute used to mark the Application defined delivery time assigned to the message
+    *
+    * @deprecated Use the SCHEDULED_DELIVERY_TIME value as this is not JMS specific and will be removed.
     */
+   @Deprecated
    public static final Symbol JMS_DELIVERY_TIME = Symbol.getSymbol("x-opt-delivery-time");
 
    /**
     * Attribute used to mark the Application defined delivery time assigned to the message
     */
+   public static final Symbol SCHEDULED_DELIVERY_TIME = Symbol.getSymbol("x-opt-delivery-time");
+
+   /**
+    * Attribute used to mark the Application defined delivery time assigned to the message
+    */
+   public static final Symbol SCHEDULED_DELIVERY_DELAY = Symbol.getSymbol("x-opt-delivery-delay");
+
+   /**
+    * Attribute used to mark the Application defined delivery time assigned to the message
+    */
    public static final Symbol ROUTING_TYPE = Symbol.getSymbol("x-opt-routing-type");
 
    /**
@@ -227,6 +240,24 @@ public final class AMQPMessageSupport {
    }
 
    /**
+    * Check whether the content-type given matches the expect value.
+    *
+    * @param expected
+    *        content type string to compare against or null if not expected to be set
+    * @param actual
+    *        the AMQP content type symbol from the Properties section
+    *
+    * @return true if content type matches
+    */
+   public static boolean isContentType(String expected, Symbol actual) {
+      if (expected == null) {
+         return actual == null;
+      } else {
+         return expected.equals(actual != null ? actual.toString() : actual);
+      }
+   }
+
+   /**
     * @param contentType
     *        the contentType of the received message
     * @return the character set to use, or null if not to treat the message as text


[3/5] activemq-artemis git commit: ARTEMIS-2096 Refactor AMQMessage abstraction

Posted by cl...@apache.org.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a851a8f9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
index 45ba931..e147687 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
@@ -98,23 +98,29 @@ import io.netty.buffer.PooledByteBufAllocator;
  * */
 public class AmqpCoreConverter {
 
-   @SuppressWarnings("unchecked")
    public static ICoreMessage toCore(AMQPMessage message, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
+      return message.toCore(coreMessageObjectPools);
+   }
+
+   @SuppressWarnings("unchecked")
+   public static ICoreMessage toCore(AMQPMessage message, CoreMessageObjectPools coreMessageObjectPools, Header header, MessageAnnotations annotations, Properties properties, ApplicationProperties applicationProperties, Section body, Footer footer) throws Exception {
+      final long messageId = message.getMessageID();
+      final Symbol contentType = properties != null ? properties.getContentType() : null;
+      final String contentTypeString = contentType != null ? contentType.toString() : null;
 
-      Section body = message.getProtonMessage().getBody();
       ServerJMSMessage result;
 
       if (body == null) {
-         if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) {
-            result = createObjectMessage(message.getMessageID(), coreMessageObjectPools);
-         } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message.getProtonMessage()) || isContentType(null, message.getProtonMessage())) {
-            result = createBytesMessage(message.getMessageID(), coreMessageObjectPools);
+         if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), contentType)) {
+            result = createObjectMessage(messageId, coreMessageObjectPools);
+         } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, contentType) || isContentType(null, contentType)) {
+            result = createBytesMessage(messageId, coreMessageObjectPools);
          } else {
-            Charset charset = getCharsetForTextualContent(message.getProtonMessage().getContentType());
+            Charset charset = getCharsetForTextualContent(contentTypeString);
             if (charset != null) {
-               result = createTextMessage(message.getMessageID(), coreMessageObjectPools);
+               result = createTextMessage(messageId, coreMessageObjectPools);
             } else {
-               result = createMessage(message.getMessageID(), coreMessageObjectPools);
+               result = createMessage(messageId, coreMessageObjectPools);
             }
          }
 
@@ -122,30 +128,30 @@ public class AmqpCoreConverter {
       } else if (body instanceof Data) {
          Binary payload = ((Data) body).getValue();
 
-         if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) {
-            result = createObjectMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
-         } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message.getProtonMessage())) {
-            result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
+         if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), contentType)) {
+            result = createObjectMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
+         } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, contentType)) {
+            result = createBytesMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
          } else {
-            Charset charset = getCharsetForTextualContent(message.getProtonMessage().getContentType());
+            Charset charset = getCharsetForTextualContent(contentTypeString);
             if (StandardCharsets.UTF_8.equals(charset)) {
                ByteBuffer buf = ByteBuffer.wrap(payload.getArray(), payload.getArrayOffset(), payload.getLength());
 
                try {
                   CharBuffer chars = charset.newDecoder().decode(buf);
-                  result = createTextMessage(message.getMessageID(), String.valueOf(chars), coreMessageObjectPools);
+                  result = createTextMessage(messageId, String.valueOf(chars), coreMessageObjectPools);
                } catch (CharacterCodingException e) {
-                  result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
+                  result = createBytesMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
                }
             } else {
-               result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
+               result = createBytesMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
             }
          }
 
          result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
       } else if (body instanceof AmqpSequence) {
          AmqpSequence sequence = (AmqpSequence) body;
-         ServerJMSStreamMessage m = createStreamMessage(message.getMessageID(), coreMessageObjectPools);
+         ServerJMSStreamMessage m = createStreamMessage(messageId, coreMessageObjectPools);
          for (Object item : sequence.getValue()) {
             m.writeObject(item);
          }
@@ -155,35 +161,35 @@ public class AmqpCoreConverter {
       } else if (body instanceof AmqpValue) {
          Object value = ((AmqpValue) body).getValue();
          if (value == null || value instanceof String) {
-            result = createTextMessage(message.getMessageID(), (String) value, coreMessageObjectPools);
+            result = createTextMessage(messageId, (String) value, coreMessageObjectPools);
 
             result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, value == null ? AMQP_VALUE_NULL : AMQP_VALUE_STRING);
          } else if (value instanceof Binary) {
             Binary payload = (Binary) value;
 
-            if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) {
-               result = createObjectMessage(message.getMessageID(), payload, coreMessageObjectPools);
+            if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), contentType)) {
+               result = createObjectMessage(messageId, payload, coreMessageObjectPools);
             } else {
-               result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
+               result = createBytesMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
             }
 
             result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
          } else if (value instanceof List) {
-            ServerJMSStreamMessage m = createStreamMessage(message.getMessageID(), coreMessageObjectPools);
+            ServerJMSStreamMessage m = createStreamMessage(messageId, coreMessageObjectPools);
             for (Object item : (List<Object>) value) {
                m.writeObject(item);
             }
             result = m;
             result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_LIST);
          } else if (value instanceof Map) {
-            result = createMapMessage(message.getMessageID(), (Map<String, Object>) value, coreMessageObjectPools);
+            result = createMapMessage(messageId, (Map<String, Object>) value, coreMessageObjectPools);
             result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_MAP);
          } else {
             ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
             try {
                TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buf));
                TLSEncode.getEncoder().writeObject(body);
-               result = createBytesMessage(message.getMessageID(), buf.array(), 0, buf.writerIndex(), coreMessageObjectPools);
+               result = createBytesMessage(messageId, buf.array(), 0, buf.writerIndex(), coreMessageObjectPools);
             } finally {
                buf.release();
                TLSEncode.getEncoder().setByteBuffer((WritableBuffer)null);
@@ -193,30 +199,38 @@ public class AmqpCoreConverter {
          throw new RuntimeException("Unexpected body type: " + body.getClass());
       }
 
-      TypedProperties properties = message.getExtraProperties();
-      if (properties != null) {
-         for (SimpleString str : properties.getPropertyNames()) {
-            if (str.equals(AMQPMessage.ADDRESS_PROPERTY)) {
-               continue;
-            }
-            result.getInnerMessage().putObjectProperty(str, properties.getProperty(str));
+      processHeader(result, header);
+      processMessageAnnotations(result, annotations);
+      processApplicationProperties(result, applicationProperties);
+      processProperties(result, properties);
+      processFooter(result, footer);
+      processExtraProperties(result, message.getExtraProperties());
+
+      // If the JMS expiration has not yet been set...
+      if (header != null && result.getJMSExpiration() == 0) {
+         // Then lets try to set it based on the message TTL.
+         long ttl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
+         if (header.getTtl() != null) {
+            ttl = header.getTtl().longValue();
+         }
+
+         if (ttl == 0) {
+            result.setJMSExpiration(0);
+         } else {
+            result.setJMSExpiration(System.currentTimeMillis() + ttl);
          }
       }
 
-      populateMessage(result, message.getProtonMessage());
       result.getInnerMessage().setReplyTo(message.getReplyTo());
       result.getInnerMessage().setDurable(message.isDurable());
       result.getInnerMessage().setPriority(message.getPriority());
       result.getInnerMessage().setAddress(message.getAddressSimpleString());
-
       result.encode();
 
-      return result != null ? result.getInnerMessage() : null;
+      return result.getInnerMessage();
    }
 
-   @SuppressWarnings("unchecked")
-   protected static ServerJMSMessage populateMessage(ServerJMSMessage jms, org.apache.qpid.proton.message.Message amqp) throws Exception {
-      Header header = amqp.getHeader();
+   protected static ServerJMSMessage processHeader(ServerJMSMessage jms, Header header) throws Exception {
       if (header != null) {
          jms.setBooleanProperty(JMS_AMQP_HEADER, true);
 
@@ -248,9 +262,12 @@ public class AmqpCoreConverter {
          jms.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
       }
 
-      final MessageAnnotations ma = amqp.getMessageAnnotations();
-      if (ma != null) {
-         for (Map.Entry<?, ?> entry : ma.getValue().entrySet()) {
+      return jms;
+   }
+
+   protected static ServerJMSMessage processMessageAnnotations(ServerJMSMessage jms, MessageAnnotations annotations) throws Exception {
+      if (annotations != null && annotations.getValue() != null) {
+         for (Map.Entry<?, ?> entry : annotations.getValue().entrySet()) {
             String key = entry.getKey().toString();
             if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) {
                long deliveryTime = ((Number) entry.getValue()).longValue();
@@ -266,14 +283,33 @@ public class AmqpCoreConverter {
          }
       }
 
-      final ApplicationProperties ap = amqp.getApplicationProperties();
-      if (ap != null) {
-         for (Map.Entry<String, Object> entry : (Set<Map.Entry<String, Object>>) ap.getValue().entrySet()) {
+      return jms;
+   }
+
+   private static ServerJMSMessage processApplicationProperties(ServerJMSMessage jms, ApplicationProperties properties) throws Exception {
+      if (properties != null && properties.getValue() != null) {
+         for (Map.Entry<String, Object> entry : (Set<Map.Entry<String, Object>>) properties.getValue().entrySet()) {
             setProperty(jms, entry.getKey(), entry.getValue());
          }
       }
 
-      final Properties properties = amqp.getProperties();
+      return jms;
+   }
+
+   private static ServerJMSMessage processExtraProperties(ServerJMSMessage jms, TypedProperties properties) {
+      if (properties != null) {
+         for (SimpleString str : properties.getPropertyNames()) {
+            if (str.equals(AMQPMessage.ADDRESS_PROPERTY)) {
+               continue;
+            }
+            jms.getInnerMessage().putObjectProperty(str, properties.getProperty(str));
+         }
+      }
+
+      return jms;
+   }
+
+   private static ServerJMSMessage processProperties(ServerJMSMessage jms, Properties properties) throws Exception {
       if (properties != null) {
          if (properties.getMessageId() != null) {
             jms.setJMSMessageID(AMQPMessageIdHelper.INSTANCE.toMessageIdString(properties.getMessageId()));
@@ -317,24 +353,13 @@ public class AmqpCoreConverter {
          }
       }
 
-      // If the jms expiration has not yet been set...
-      if (header != null && jms.getJMSExpiration() == 0) {
-         // Then lets try to set it based on the message ttl.
-         long ttl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
-         if (header.getTtl() != null) {
-            ttl = header.getTtl().longValue();
-         }
-
-         if (ttl == 0) {
-            jms.setJMSExpiration(0);
-         } else {
-            jms.setJMSExpiration(System.currentTimeMillis() + ttl);
-         }
-      }
+      return jms;
+   }
 
-      final Footer fp = amqp.getFooter();
-      if (fp != null) {
-         for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) fp.getValue().entrySet()) {
+   @SuppressWarnings("unchecked")
+   private static ServerJMSMessage processFooter(ServerJMSMessage jms, Footer footer) throws Exception {
+      if (footer != null && footer.getValue() != null) {
+         for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) footer.getValue().entrySet()) {
             String key = entry.getKey().toString();
             setProperty(jms, JMS_AMQP_FOOTER_PREFIX + key, entry.getValue());
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a851a8f9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
index 940a746..e929406 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
@@ -139,5 +139,4 @@ public class AmqpSupport {
 
       return null;
    }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a851a8f9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritable.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritable.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritable.java
index bf46e81..d752bd7 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritable.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritable.java
@@ -117,11 +117,9 @@ public class NettyWritable implements WritableBuffer {
    @Override
    public void put(ReadableBuffer buffer) {
       if (buffer.hasArray()) {
-         nettyBuffer.writeBytes(buffer.array(), buffer.arrayOffset(), buffer.remaining());
+         nettyBuffer.writeBytes(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
       } else {
-         while (buffer.hasRemaining()) {
-            nettyBuffer.writeByte(buffer.get());
-         }
+         nettyBuffer.writeBytes(buffer.byteBuffer());
       }
    }
 }


[5/5] activemq-artemis git commit: This closes #2329

Posted by cl...@apache.org.
This closes #2329


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7a463f03
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7a463f03
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7a463f03

Branch: refs/heads/master
Commit: 7a463f038ae324f2c5c908321b2ebf03b5a8e303
Parents: 2453978 a851a8f
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Sep 26 09:19:41 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Sep 26 09:19:41 2018 -0400

----------------------------------------------------------------------
 .../protocol/amqp/broker/AMQPMessage.java       | 1602 +++++++------
 .../amqp/broker/AMQPMessagePersister.java       |    3 -
 .../amqp/converter/AMQPMessageSupport.java      |   31 +
 .../amqp/converter/AmqpCoreConverter.java       |  149 +-
 .../protocol/amqp/proton/AmqpSupport.java       |    1 -
 .../protocol/amqp/util/NettyWritable.java       |    6 +-
 .../protocol/amqp/broker/AMQPMessageTest.java   | 2231 ++++++++++++++++++
 .../amqp/converter/TestConversions.java         |   38 +-
 .../JMSMappingInboundTransformerTest.java       |  124 +-
 .../JMSMappingOutboundTransformerTest.java      |   54 +-
 .../JMSTransformationSpeedComparisonTest.java   |   47 +-
 .../message/MessageTransformationTest.java      |   68 +-
 .../protocol/amqp/message/AMQPMessageTest.java  |  438 ----
 .../protocol/amqp/util/NettyWritableTest.java   |   23 +
 .../amqp/AmqpExpiredMessageTest.java            |   16 +-
 .../integration/journal/MessageJournalTest.java |   22 +-
 16 files changed, 3491 insertions(+), 1362 deletions(-)
----------------------------------------------------------------------



[2/5] activemq-artemis git commit: ARTEMIS-2096 Refactor AMQMessage abstraction

Posted by cl...@apache.org.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a851a8f9/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
new file mode 100644
index 0000000..1e5d3d1
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
@@ -0,0 +1,2231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.broker;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
+import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
+import org.apache.activemq.artemis.reader.MessageUtil;
+import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.activemq.artemis.utils.collections.TypedProperties;
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedByte;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.apache.qpid.proton.amqp.UnsignedLong;
+import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Footer;
+import org.apache.qpid.proton.amqp.messaging.Header;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Properties;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.codec.EncoderImpl;
+import org.apache.qpid.proton.codec.EncodingCodes;
+import org.apache.qpid.proton.codec.ReadableBuffer;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.message.impl.MessageImpl;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+public class AMQPMessageTest {
+
+   private static final String TEST_TO_ADDRESS = "someAddress";
+
+   private static final String TEST_MESSAGE_ANNOTATION_KEY = "x-opt-test-annotation";
+   private static final String TEST_MESSAGE_ANNOTATION_VALUE = "test-annotation";
+
+   private static final String TEST_APPLICATION_PROPERTY_KEY = "key-1";
+   private static final String TEST_APPLICATION_PROPERTY_VALUE = "value-1";
+
+   private static final String TEST_STRING_BODY = "test-string-body";
+
+   private byte[] encodedProtonMessage;
+
+   @Before
+   public void setUp() {
+      encodedProtonMessage = encodeMessage(createProtonMessage());
+   }
+
+   //----- Test Message Creation ---------------------------------------------//
+
+   @Test
+   public void testCreateMessageFromEncodedByteArrayData() {
+      // Constructor 1
+      AMQPMessage decoded = new AMQPMessage(0, encodedProtonMessage, null);
+
+      assertTrue(decoded.isDurable());
+      assertEquals(TEST_TO_ADDRESS, decoded.getAddress());
+
+      // Constructor 2
+      decoded = new AMQPMessage(0, encodedProtonMessage, null, null);
+
+      assertTrue(decoded.isDurable());
+      assertEquals(TEST_TO_ADDRESS, decoded.getAddress());
+   }
+
+   @Test
+   public void testCreateMessageFromEncodedReadableBuffer() {
+      AMQPMessage decoded = new AMQPMessage(0, ReadableBuffer.ByteBufferReader.wrap(encodedProtonMessage), null, null);
+
+      assertEquals(true, decoded.getHeader().getDurable());
+      assertEquals(TEST_TO_ADDRESS, decoded.getAddress());
+   }
+
+   @Test
+   public void testCreateMessageFromEncodedByteArrayDataWithExtraProperties() {
+      AMQPMessage decoded = new AMQPMessage(0, encodedProtonMessage, new TypedProperties(), null);
+
+      assertEquals(true, decoded.getHeader().getDurable());
+      assertEquals(TEST_TO_ADDRESS, decoded.getAddress());
+      assertNotNull(decoded.getExtraProperties());
+   }
+
+   @Test
+   public void testCreateMessageForPersistenceDataReload() throws ActiveMQException {
+      MessageImpl protonMessage = createProtonMessage();
+      ActiveMQBuffer encoded = encodeMessageAsPersistedBuffer(protonMessage);
+
+      AMQPMessage message = new AMQPMessage(0);
+      try {
+         message.getProtonMessage();
+         fail("Should throw NPE due to not being initialized yet");
+      } catch (NullPointerException npe) {
+      }
+
+      final long persistedSize = (long) encoded.readableBytes();
+
+      // Now reload from encoded data
+      message.reloadPersistence(encoded);
+
+      assertEquals(persistedSize, message.getPersistSize());
+      assertEquals(persistedSize - Integer.BYTES, message.getPersistentSize());
+      assertEquals(persistedSize - Integer.BYTES, message.getEncodeSize());
+      assertEquals(true, message.getHeader().getDurable());
+      assertEquals(TEST_TO_ADDRESS, message.getAddress());
+   }
+
+   //----- Test Memory Estimate access ---------------------------------------//
+
+   @Test
+   public void testGetMemoryEstimate() {
+      AMQPMessage decoded = new AMQPMessage(0, encodedProtonMessage, new TypedProperties(), null);
+
+      int estimate = decoded.getMemoryEstimate();
+      assertTrue(encodedProtonMessage.length < decoded.getMemoryEstimate());
+      assertEquals(estimate, decoded.getMemoryEstimate());
+
+      decoded.putStringProperty(new SimpleString("newProperty"), "newValue");
+      decoded.reencode();
+
+      assertNotEquals(estimate, decoded.getMemoryEstimate());
+   }
+
+   //----- Test Connection ID access -----------------------------------------//
+
+   @Test
+   public void testGetConnectionID() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(null, decoded.getConnectionID());
+   }
+
+   @Test
+   public void testSetConnectionID() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      final String ID = UUID.randomUUID().toString();
+
+      assertEquals(null, decoded.getConnectionID());
+      decoded.setConnectionID(ID);
+      assertEquals(ID, decoded.getConnectionID());
+   }
+
+   @Test
+   public void testGetConnectionIDFromProperties() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      final String ID = UUID.randomUUID().toString();
+
+      assertEquals(null, decoded.getConnectionID());
+      decoded.setConnectionID(ID);
+      assertEquals(ID, decoded.getConnectionID());
+      assertEquals(ID, decoded.getStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME));
+   }
+
+   //----- Test LastValueProperty access -------------------------------//
+
+   @Test
+   public void testGetLastValueFromMessageWithNone() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertNull(decoded.getLastValueProperty());
+   }
+
+   @Test
+   public void testSetLastValueFromMessageWithNone() {
+      SimpleString lastValue = new SimpleString("last-address");
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertNull(decoded.getLastValueProperty());
+      decoded.setLastValueProperty(lastValue);
+      assertEquals(lastValue, decoded.getLastValueProperty());
+   }
+
+   //----- Test User ID access -----------------------------------------//
+
+   @Test
+   public void getUserIDWhenNoPropertiesExists() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertNull(decoded.getUserID());
+      decoded.setUserID(UUID.randomUUID().toString());
+      assertNull(decoded.getUserID());
+   }
+
+   @Test
+   public void testSetUserIDHasNoEffectOnMessagePropertiesWhenNotPresent() {
+      final String ID = UUID.randomUUID().toString();
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertNull(decoded.getUserID());
+      assertNull(decoded.getProperties());
+
+      decoded.setUserID(ID);
+      decoded.reencode();
+
+      assertNull(decoded.getUserID());
+      assertNull(decoded.getProperties());
+   }
+
+   @Test
+   public void testSetUserIDHasNoEffectOnMessagePropertiesWhenPresentButNoMessageID() {
+      final String ID = UUID.randomUUID().toString();
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setProperties(new Properties());
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertNull(decoded.getUserID());
+      assertNotNull(decoded.getProperties());
+      assertNull(decoded.getProperties().getMessageId());
+
+      decoded.setUserID(ID);
+      decoded.reencode();
+
+      assertNull(decoded.getUserID());
+      assertNotNull(decoded.getProperties());
+      assertNull(decoded.getProperties().getMessageId());
+   }
+
+   @Test
+   public void testSetUserIDHasNoEffectOnMessagePropertiesWhenPresentWithMessageID() {
+      final String ID = UUID.randomUUID().toString();
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setProperties(new Properties());
+      protonMessage.setMessageId(ID);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertNotNull(decoded.getUserID());
+      assertNotNull(decoded.getProperties());
+      assertNotNull(decoded.getProperties().getMessageId());
+      assertEquals(ID, decoded.getUserID());
+
+      decoded.setUserID(ID);
+      decoded.reencode();
+
+      assertNotNull(decoded.getUserID());
+      assertNotNull(decoded.getProperties());
+      assertNotNull(decoded.getProperties().getMessageId());
+      assertEquals(ID, decoded.getUserID());
+   }
+
+   //----- Test the getDuplicateProperty methods -----------------------------//
+
+   @Test
+   public void testGetDuplicateProperty() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(null, decoded.getDuplicateProperty());
+   }
+
+   //----- Test the getAddress methods ---------------------------------------//
+
+   @Test
+   public void testGetAddressFromMessage() {
+      final String ADDRESS = "myQueue";
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader(new Header());
+      protonMessage.setAddress(ADDRESS);
+
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(ADDRESS, decoded.getAddress());
+   }
+
+   @Test
+   public void testGetAddressSimpleStringFromMessage() {
+      final String ADDRESS = "myQueue";
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader(new Header());
+      protonMessage.setAddress(ADDRESS);
+
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(ADDRESS, decoded.getAddressSimpleString().toString());
+   }
+
+   @Test
+   public void testGetAddressFromMessageWithNoValueSet() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertNull(decoded.getAddress());
+      assertNull(decoded.getAddressSimpleString());
+   }
+
+   @Test
+   public void testSetAddressFromMessage() {
+      final String ADDRESS = "myQueue";
+      final SimpleString NEW_ADDRESS = new SimpleString("myQueue-1");
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setAddress(ADDRESS);
+
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(ADDRESS, decoded.getAddress());
+      decoded.setAddress(NEW_ADDRESS);
+      assertEquals(NEW_ADDRESS, decoded.getAddressSimpleString());
+   }
+
+   @Test
+   public void testSetAddressFromMessageUpdatesPropertiesOnReencode() {
+      final String ADDRESS = "myQueue";
+      final SimpleString NEW_ADDRESS = new SimpleString("myQueue-1");
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setAddress(ADDRESS);
+
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(ADDRESS, decoded.getAddress());
+      decoded.setAddress(NEW_ADDRESS);
+      decoded.reencode();
+
+      assertEquals(NEW_ADDRESS.toString(), decoded.getProperties().getTo());
+      assertEquals(NEW_ADDRESS, decoded.getAddressSimpleString());
+   }
+
+   //----- Test the durability set and get methods ---------------------------//
+
+   @Test
+   public void testIsDurableFromMessageWithHeaderTaggedAsTrue() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader(new Header());
+      protonMessage.setDurable(true);
+
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+      assertTrue(decoded.isDurable());
+   }
+
+   @Test
+   public void testIsDurableFromMessageWithHeaderTaggedAsFalse() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader(new Header());
+      protonMessage.setDurable(false);
+
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+      assertFalse(decoded.isDurable());
+   }
+
+   @Test
+   public void testIsDurableFromMessageWithNoValueSet() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+      assertFalse(decoded.isDurable());
+   }
+
+   @Test
+   public void testIsDuranleReturnsTrueOnceUpdated() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+      assertFalse(decoded.isDurable());
+      decoded.setDurable(true);
+      assertTrue(decoded.isDurable());
+   }
+
+   @Test
+   public void testNonDurableMessageReencodedToDurable() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader(new Header());
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+      assertFalse(decoded.isDurable());
+
+      // Underlying message data not updated yet
+      assertNull(decoded.getHeader().getDurable());
+
+      decoded.setDurable(true);
+      decoded.reencode();
+      assertTrue(decoded.isDurable());
+
+      // Underlying message data now updated
+      assertTrue(decoded.getHeader().getDurable());
+   }
+
+   @Test
+   public void testMessageWithNoHeaderGetsOneWhenDurableSetAndReencoded() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+      assertFalse(decoded.isDurable());
+
+      // Underlying message data not updated yet
+      assertNull(decoded.getHeader());
+
+      decoded.setDurable(true);
+      decoded.reencode();
+      assertTrue(decoded.isDurable());
+
+      // Underlying message data now updated
+      Header header = decoded.getHeader();
+      assertNotNull(header);
+      assertTrue(header.getDurable());
+   }
+
+   //----- Test RoutingType access -------------------------------------------//
+
+   @Test
+   public void testGetRoutingTypeFromMessageWithoutIt() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertNull(decoded.getRoutingType());
+   }
+
+   @Test
+   public void testSetRoutingType() {
+      RoutingType type = RoutingType.ANYCAST;
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertNull(decoded.getRoutingType());
+      decoded.setRoutingType(type);
+      assertEquals(type, decoded.getRoutingType());
+   }
+
+   @Test
+   public void testSetRoutingTypeToClear() {
+      RoutingType type = RoutingType.ANYCAST;
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertNull(decoded.getRoutingType());
+      decoded.setRoutingType(type);
+      assertEquals(type, decoded.getRoutingType());
+      decoded.setRoutingType(null);
+      assertNull(decoded.getRoutingType());
+   }
+
+   @Test
+   public void testRemoveRoutingTypeFromMessageEncodedWithOne() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
+      annotations.getValue().put(AMQPMessageSupport.ROUTING_TYPE, RoutingType.ANYCAST.getType());
+      protonMessage.setMessageAnnotations(annotations);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(RoutingType.ANYCAST, decoded.getRoutingType());
+      decoded.setRoutingType(null);
+      decoded.reencode();
+      assertNull(decoded.getRoutingType());
+
+      assertTrue(decoded.getMessageAnnotations().getValue().isEmpty());
+   }
+
+   @Test
+   public void testGetRoutingTypeFromMessageWithAnyCastType() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
+      annotations.getValue().put(AMQPMessageSupport.ROUTING_TYPE, RoutingType.ANYCAST.getType());
+      protonMessage.setMessageAnnotations(annotations);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(RoutingType.ANYCAST, decoded.getRoutingType());
+   }
+
+   @Test
+   public void testGetRoutingTypeFromMessageWithMulticastType() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
+      annotations.getValue().put(AMQPMessageSupport.ROUTING_TYPE, RoutingType.MULTICAST.getType());
+      protonMessage.setMessageAnnotations(annotations);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(RoutingType.MULTICAST, decoded.getRoutingType());
+   }
+
+   @Test
+   public void testGetRoutingTypeFromMessageWithQueueType() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
+      annotations.getValue().put(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION, AMQPMessageSupport.QUEUE_TYPE);
+      protonMessage.setMessageAnnotations(annotations);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(RoutingType.ANYCAST, decoded.getRoutingType());
+   }
+
+   @Test
+   public void testGetRoutingTypeFromMessageWithTempQueueType() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
+      annotations.getValue().put(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION, AMQPMessageSupport.TEMP_QUEUE_TYPE);
+      protonMessage.setMessageAnnotations(annotations);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(RoutingType.ANYCAST, decoded.getRoutingType());
+   }
+
+   @Test
+   public void testGetRoutingTypeFromMessageWithTopicType() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
+      annotations.getValue().put(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION, AMQPMessageSupport.TOPIC_TYPE);
+      protonMessage.setMessageAnnotations(annotations);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(RoutingType.MULTICAST, decoded.getRoutingType());
+   }
+
+   @Test
+   public void testGetRoutingTypeFromMessageWithTempTopicType() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
+      annotations.getValue().put(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION, AMQPMessageSupport.TEMP_TOPIC_TYPE);
+      protonMessage.setMessageAnnotations(annotations);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(RoutingType.MULTICAST, decoded.getRoutingType());
+   }
+
+   @Test
+   public void testGetRoutingTypeFromMessageWithUnknownType() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
+      annotations.getValue().put(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION, (byte) 32);
+      protonMessage.setMessageAnnotations(annotations);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertNull(decoded.getRoutingType());
+   }
+
+   //----- Test access to message Group ID -----------------------------------//
+
+   @Test
+   public void testGetGroupIDFromMessage() {
+      final String GROUP_ID = "group-1";
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader(new Header());
+      protonMessage.setGroupId(GROUP_ID);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(GROUP_ID, decoded.getGroupID().toString());
+   }
+
+   @Test
+   public void testGetGroupIDFromMessageWithNoGroupId() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setProperties(new Properties());
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+      assertNull(decoded.getGroupID());
+   }
+
+   @Test
+   public void testGetGroupIDFromMessageWithNoProperties() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+      assertNull(decoded.getGroupID());
+   }
+
+   //----- Test access to message Group ID -----------------------------------//
+
+   @Test
+   public void testGetReplyToFromMessage() {
+      final String REPLY_TO = "address-1";
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader(new Header());
+      protonMessage.setReplyTo(REPLY_TO);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(REPLY_TO, decoded.getReplyTo().toString());
+   }
+
+   @Test
+   public void testGetReplyToFromMessageWithNoReplyTo() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setProperties(new Properties());
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+      assertNull(decoded.getReplyTo());
+   }
+
+   @Test
+   public void testGetReplyToFromMessageWithNoProperties() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+      assertNull(decoded.getReplyTo());
+   }
+
+   @Test
+   public void testSetReplyToFromMessageWithProperties() {
+      final String REPLY_TO = "address-1";
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setProperties(new Properties());
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+      assertNull(decoded.getReplyTo());
+
+      decoded.setReplyTo(new SimpleString(REPLY_TO));
+      decoded.reencode();
+
+      assertEquals(REPLY_TO, decoded.getReplyTo().toString());
+      assertEquals(REPLY_TO, decoded.getProperties().getReplyTo());
+   }
+
+   @Test
+   public void testSetReplyToFromMessageWithNoProperties() {
+      final String REPLY_TO = "address-1";
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+      assertNull(decoded.getReplyTo());
+
+      decoded.setReplyTo(new SimpleString(REPLY_TO));
+      decoded.reencode();
+
+      assertEquals(REPLY_TO, decoded.getReplyTo().toString());
+      assertEquals(REPLY_TO, decoded.getProperties().getReplyTo());
+   }
+
+   @Test
+   public void testSetReplyToFromMessageWithPropertiesCanClear() {
+      final String REPLY_TO = "address-1";
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setProperties(new Properties());
+      protonMessage.setReplyTo(REPLY_TO);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+      assertEquals(REPLY_TO, decoded.getReplyTo().toString());
+
+      decoded.setReplyTo(null);
+      decoded.reencode();
+
+      assertEquals(null, decoded.getReplyTo());
+      assertEquals(null, decoded.getProperties().getReplyTo());
+   }
+
+   //----- Test access to User ID --------------------------------------------//
+
+   @Test
+   public void testGetUserIDFromMessage() {
+      final String USER_NAME = "foo";
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setUserId(USER_NAME.getBytes(StandardCharsets.UTF_8));
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(USER_NAME, decoded.getAMQPUserID());
+   }
+
+   @Test
+   public void testGetUserIDFromMessageWithNoProperties() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertNull(decoded.getAMQPUserID());
+   }
+
+   @Test
+   public void testGetUserIDFromMessageWithNoUserID() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setProperties(new Properties());
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertNull(decoded.getAMQPUserID());
+   }
+
+   //----- Test access message priority --------------------------------------//
+
+   @Test
+   public void testGetPriorityFromMessage() {
+      final short PRIORITY = 7;
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader(new Header());
+      protonMessage.setPriority(PRIORITY);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(PRIORITY, decoded.getPriority());
+   }
+
+   @Test
+   public void testGetPriorityFromMessageWithNoHeader() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(AMQPMessage.DEFAULT_MESSAGE_PRIORITY, decoded.getPriority());
+   }
+
+   @Test
+   public void testGetPriorityFromMessageWithNoPrioritySet() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader(new Header());
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(AMQPMessage.DEFAULT_MESSAGE_PRIORITY, decoded.getPriority());
+   }
+
+   @Test
+   public void testSetPriorityOnMessageWithHeader() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader(new Header());
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(AMQPMessage.DEFAULT_MESSAGE_PRIORITY, decoded.getPriority());
+
+      decoded.setPriority((byte) 9);
+      decoded.reencode();
+
+      assertEquals(9, decoded.getPriority());
+      assertEquals(9, decoded.getHeader().getPriority().byteValue());
+   }
+
+   @Test
+   public void testSetPriorityOnMessageWithoutHeader() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(AMQPMessage.DEFAULT_MESSAGE_PRIORITY, decoded.getPriority());
+
+      decoded.setPriority((byte) 9);
+      decoded.reencode();
+
+      assertEquals(9, decoded.getPriority());
+      assertEquals(9, decoded.getHeader().getPriority().byteValue());
+   }
+
+   //----- Test access message expiration ------------------------------------//
+
+   @Test
+   public void testGetExpirationFromMessageWithNoHeader() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(0, decoded.getExpiration());
+   }
+
+   @Test
+   public void testGetExpirationFromMessageWithNoTTLInHeader() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader(new Header());
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(0, decoded.getExpiration());
+   }
+
+   @Test
+   public void testGetExpirationFromMessageWithNoTTLInHeaderOrExpirationInProperties() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader(new Header());
+      protonMessage.setProperties(new Properties());
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(0, decoded.getExpiration());
+   }
+
+   @Test
+   public void testGetExpirationFromMessageUsingTTL() {
+      final long ttl = 100000;
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader(new Header());
+      protonMessage.setTtl(ttl);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertTrue(decoded.getExpiration() > System.currentTimeMillis());
+   }
+
+   @Test
+   public void testGetExpirationFromMessageUsingAbsoluteExpiration() {
+      final Date expirationTime = new Date(System.currentTimeMillis());
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      Properties properties = new Properties();
+      properties.setAbsoluteExpiryTime(expirationTime);
+      protonMessage.setProperties(properties);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(expirationTime.getTime(), decoded.getExpiration());
+   }
+
+   @Test
+   public void testGetExpirationFromMessageUsingAbsoluteExpirationNegative() {
+      final Date expirationTime = new Date(-1);
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      Properties properties = new Properties();
+      properties.setAbsoluteExpiryTime(expirationTime);
+      protonMessage.setProperties(properties);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(0, decoded.getExpiration());
+   }
+
+   @Test
+   public void testGetExpirationFromMessageAbsoluteExpirationOVerrideTTL() {
+      final Date expirationTime = new Date(System.currentTimeMillis());
+      final long ttl = 100000;
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader(new Header());
+      protonMessage.setTtl(ttl);
+      Properties properties = new Properties();
+      properties.setAbsoluteExpiryTime(expirationTime);
+      protonMessage.setProperties(properties);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(expirationTime.getTime(), decoded.getExpiration());
+   }
+
+   @Test
+   public void testSetExpiration() {
+      final Date expirationTime = new Date(System.currentTimeMillis());
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(0, decoded.getExpiration());
+      decoded.setExpiration(expirationTime.getTime());
+      assertEquals(expirationTime.getTime(), decoded.getExpiration());
+   }
+
+   @Test
+   public void testSetExpirationUpdatesProperties() {
+      final Date originalExpirationTime = new Date(System.currentTimeMillis());
+      final Date expirationTime = new Date(System.currentTimeMillis());
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setProperties(new Properties());
+      protonMessage.setExpiryTime(originalExpirationTime.getTime());
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(originalExpirationTime.getTime(), decoded.getExpiration());
+      decoded.setExpiration(expirationTime.getTime());
+      assertEquals(expirationTime.getTime(), decoded.getExpiration());
+
+      decoded.reencode();
+      assertEquals(expirationTime, decoded.getProperties().getAbsoluteExpiryTime());
+   }
+
+   @Test
+   public void testSetExpirationAddsPropertiesWhenNonePresent() {
+      final Date expirationTime = new Date(System.currentTimeMillis());
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(0, decoded.getExpiration());
+      decoded.setExpiration(expirationTime.getTime());
+      assertEquals(expirationTime.getTime(), decoded.getExpiration());
+
+      decoded.reencode();
+      assertEquals(expirationTime, decoded.getProperties().getAbsoluteExpiryTime());
+   }
+
+   @Test
+   public void testSetExpirationToClearUpdatesPropertiesWhenPresent() {
+      final Date expirationTime = new Date(System.currentTimeMillis());
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setProperties(new Properties());
+      protonMessage.setExpiryTime(expirationTime.getTime());
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(expirationTime.getTime(), decoded.getExpiration());
+      decoded.setExpiration(-1);
+      assertEquals(0, decoded.getExpiration());
+
+      decoded.reencode();
+      assertEquals(0, decoded.getExpiration());
+      assertNull(decoded.getProperties().getAbsoluteExpiryTime());
+   }
+
+   @Test
+   public void testSetExpirationToClearDoesNotAddPropertiesWhenNonePresent() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(0, decoded.getExpiration());
+      decoded.setExpiration(-1);
+      assertEquals(0, decoded.getExpiration());
+
+      decoded.reencode();
+      assertEquals(0, decoded.getExpiration());
+      assertNull(decoded.getProperties());
+   }
+
+   @Test
+   public void testSetExpirationToClearUpdateHeader() {
+      final long ttl = 100000;
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader(new Header());
+      protonMessage.setTtl(ttl);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertTrue(decoded.getExpiration() > System.currentTimeMillis());
+
+      decoded.setExpiration(-1);
+      decoded.reencode();
+
+      assertEquals(0, decoded.getExpiration());
+      assertNull(decoded.getHeader().getTtl());
+   }
+
+   //----- Test access message time stamp ------------------------------------//
+
+   @Test
+   public void testGetTimestampFromMessage() {
+      Date timestamp = new Date(System.currentTimeMillis());
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader(new Header());
+      Properties properties = new Properties();
+      properties.setCreationTime(timestamp);
+      protonMessage.setProperties(properties);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(timestamp.getTime(), decoded.getTimestamp());
+   }
+
+   @Test
+   public void testGetTimestampFromMessageWithNoCreateTimeSet() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader(new Header());
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(0L, decoded.getTimestamp());
+   }
+
+   @Test
+   public void testGetTimestampFromMessageWithNoHeader() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(0L, decoded.getTimestamp());
+   }
+
+   @Test
+   public void testSetTimestampOnMessage() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setProperties(new Properties());
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(0L, decoded.getTimestamp());
+
+      Date createTime = new Date(System.currentTimeMillis());
+
+      decoded.setTimestamp(createTime.getTime());
+      decoded.reencode();
+
+      assertEquals(createTime.getTime(), decoded.getTimestamp());
+      assertEquals(createTime, decoded.getProperties().getCreationTime());
+   }
+
+   @Test
+   public void testSetTimestampOnMessageWithNoPropertiesSection() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(0L, decoded.getTimestamp());
+
+      Date createTime = new Date(System.currentTimeMillis());
+
+      decoded.setTimestamp(createTime.getTime());
+      decoded.reencode();
+
+      assertNotNull(decoded.getProperties());
+      assertEquals(createTime.getTime(), decoded.getTimestamp());
+      assertEquals(createTime, decoded.getProperties().getCreationTime());
+   }
+
+   //----- Test access to message scheduled delivery time --------------------//
+
+   @Test
+   public void testGetScheduledDeliveryTimeMessageSentWithFixedTime() {
+      final long scheduledTime = System.currentTimeMillis();
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
+      annotations.getValue().put(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME, scheduledTime);
+      protonMessage.setMessageAnnotations(annotations);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(scheduledTime, decoded.getScheduledDeliveryTime().longValue());
+   }
+
+   @Test
+   public void testGetScheduledDeliveryTimeMessageSentWithFixedTimeAndDelay() {
+      final long scheduledTime = System.currentTimeMillis();
+      final long scheduledDelay = 100000;
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
+      annotations.getValue().put(AMQPMessageSupport.SCHEDULED_DELIVERY_DELAY, scheduledDelay);
+      annotations.getValue().put(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME, scheduledTime);
+      protonMessage.setMessageAnnotations(annotations);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(scheduledTime, decoded.getScheduledDeliveryTime().longValue());
+   }
+
+   @Test
+   public void testGetScheduledDeliveryTimeMessageSentWithFixedDelay() {
+      final long scheduledDelay = 100000;
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
+      annotations.getValue().put(AMQPMessageSupport.SCHEDULED_DELIVERY_DELAY, scheduledDelay);
+      protonMessage.setMessageAnnotations(annotations);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertTrue(decoded.getScheduledDeliveryTime().longValue() > System.currentTimeMillis());
+   }
+
+   @Test
+   public void testGetScheduledDeliveryTimeWhenMessageHasNoSetValue() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+      assertEquals(0, decoded.getScheduledDeliveryTime().longValue());
+   }
+
+   @Test
+   public void testSetScheduledDeliveryTimeWhenNonPresent() {
+      final long scheduledTime = System.currentTimeMillis() + 5000;
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(0, decoded.getScheduledDeliveryTime().longValue());
+      decoded.setScheduledDeliveryTime(scheduledTime);
+      assertEquals(scheduledTime, decoded.getScheduledDeliveryTime().longValue());
+
+      decoded.reencode();
+
+      assertEquals(scheduledTime, decoded.getMessageAnnotations().getValue().get(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME));
+   }
+
+   @Test
+   public void testSetScheduledDeliveryTimeMessageSentWithFixedTime() {
+      final long scheduledTime = System.currentTimeMillis();
+      final long newScheduledTime = System.currentTimeMillis() + 1000;
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
+      annotations.getValue().put(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME, scheduledTime);
+      protonMessage.setMessageAnnotations(annotations);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(scheduledTime, decoded.getScheduledDeliveryTime().longValue());
+
+      decoded.setScheduledDeliveryTime(newScheduledTime);
+      assertEquals(newScheduledTime, decoded.getScheduledDeliveryTime().longValue());
+      decoded.reencode();
+      assertEquals(newScheduledTime, decoded.getMessageAnnotations().getValue().get(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME));
+   }
+
+   @Test
+   public void testSetScheduledDeliveryTimeMessageSentWithFixedDelay() {
+      final long scheduledDelay = 100000;
+      final long newScheduledTime = System.currentTimeMillis() + 1000;
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
+      annotations.getValue().put(AMQPMessageSupport.SCHEDULED_DELIVERY_DELAY, scheduledDelay);
+      protonMessage.setMessageAnnotations(annotations);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertTrue(decoded.getScheduledDeliveryTime().longValue() > System.currentTimeMillis());
+
+      decoded.setScheduledDeliveryTime(newScheduledTime);
+      assertEquals(newScheduledTime, decoded.getScheduledDeliveryTime().longValue());
+      decoded.reencode();
+      assertEquals(newScheduledTime, decoded.getMessageAnnotations().getValue().get(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME));
+   }
+
+   @Test
+   public void testSetScheduledDeliveryTimeToNoneClearsDelayAndTimeValues() {
+      final long scheduledTime = System.currentTimeMillis();
+      final long scheduledDelay = 100000;
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
+      annotations.getValue().put(AMQPMessageSupport.SCHEDULED_DELIVERY_DELAY, scheduledDelay);
+      annotations.getValue().put(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME, scheduledTime);
+      protonMessage.setMessageAnnotations(annotations);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(scheduledTime, decoded.getScheduledDeliveryTime().longValue());
+
+      decoded.setScheduledDeliveryTime((long) 0);
+      decoded.reencode();
+      assertNull(decoded.getMessageAnnotations().getValue().get(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME));
+      assertNull(decoded.getMessageAnnotations().getValue().get(AMQPMessageSupport.SCHEDULED_DELIVERY_DELAY));
+   }
+
+   //----- Tests access to Message Annotations -------------------------------//
+
+   @Test
+   public void testGetAnnotation() {
+      AMQPMessage message = new AMQPMessage(0, encodedProtonMessage, null);
+
+      Object result = message.getAnnotation(new SimpleString(TEST_MESSAGE_ANNOTATION_KEY));
+      String stringResult = message.getAnnotationString(new SimpleString(TEST_MESSAGE_ANNOTATION_KEY));
+
+      assertEquals(result, stringResult);
+   }
+
+   @Test
+   public void testRemoveAnnotation() {
+      AMQPMessage message = new AMQPMessage(0, encodedProtonMessage, null);
+
+      assertNotNull(message.getAnnotation(new SimpleString(TEST_MESSAGE_ANNOTATION_KEY)));
+      message.removeAnnotation(new SimpleString(TEST_MESSAGE_ANNOTATION_KEY));
+      assertNull(message.getAnnotation(new SimpleString(TEST_MESSAGE_ANNOTATION_KEY)));
+
+      message.reencode();
+
+      assertTrue(message.getMessageAnnotations().getValue().isEmpty());
+   }
+
+   @Test
+   public void testSetAnnotation() {
+      AMQPMessage message = new AMQPMessage(0, encodedProtonMessage, null);
+
+      final SimpleString newAnnotation = new SimpleString("testSetAnnotation");
+      final String newValue = "newValue";
+
+      message.setAnnotation(newAnnotation, newValue);
+      assertEquals(newValue, message.getAnnotation(newAnnotation));
+
+      message.reencode();
+
+      assertEquals(newValue, message.getMessageAnnotations().getValue().get(Symbol.valueOf(newAnnotation.toString())));
+   }
+
+   //----- Tests accessing Proton Sections from encoded data -----------------//
+
+   @Test
+   public void testGetProtonMessage() {
+      MessageImpl protonMessage = createProtonMessage();
+      AMQPMessage message = new AMQPMessage(0, encodeMessage(protonMessage), null, null);
+
+      assertProtonMessageEquals(protonMessage, message.getProtonMessage());
+
+      message.setAnnotation(new SimpleString("testGetProtonMessage"), "1");
+      message.messageChanged();
+
+      assertProtonMessageNotEquals(protonMessage, message.getProtonMessage());
+   }
+
+   @Test
+   public void testGetHeader() {
+      MessageImpl protonMessage = createProtonMessage();
+      AMQPMessage message = new AMQPMessage(0, encodeMessage(protonMessage), null, null);
+
+      Header decoded = message.getHeader();
+      assertNotSame(decoded, protonMessage.getHeader());
+      assertHeaderEquals(protonMessage.getHeader(), decoded);
+
+      // Update the values
+      decoded.setDeliveryCount(UnsignedInteger.ZERO);
+      decoded.setTtl(UnsignedInteger.valueOf(255));
+      decoded.setFirstAcquirer(true);
+
+      // Check that the message is unaffected.
+      assertHeaderNotEquals(protonMessage.getHeader(), decoded);
+   }
+
+   @Test
+   public void testGetProperties() {
+      MessageImpl protonMessage = createProtonMessage();
+      AMQPMessage message = new AMQPMessage(0, encodeMessage(protonMessage), null, null);
+
+      Properties decoded = message.getProperties();
+      assertNotSame(decoded, protonMessage.getProperties());
+      assertPropertiesEquals(protonMessage.getProperties(), decoded);
+
+      // Update the values
+      decoded.setAbsoluteExpiryTime(new Date(System.currentTimeMillis()));
+      decoded.setGroupSequence(UnsignedInteger.valueOf(255));
+      decoded.setSubject(UUID.randomUUID().toString());
+
+      // Check that the message is unaffected.
+      assertPropertiesNotEquals(protonMessage.getProperties(), decoded);
+   }
+
+   @Test
+   public void testGetDeliveryAnnotations() {
+      MessageImpl protonMessage = createProtonMessage();
+      DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(new HashMap<>());
+      deliveryAnnotations.getValue().put(Symbol.valueOf(UUID.randomUUID().toString()), "test-1");
+      protonMessage.setDeliveryAnnotations(deliveryAnnotations);
+
+      AMQPMessage message = new AMQPMessage(0, encodeMessage(protonMessage), null, null);
+
+      DeliveryAnnotations decoded = message.getDeliveryAnnotations();
+      assertNotSame(decoded, protonMessage.getDeliveryAnnotations());
+      assertDeliveryAnnotationsEquals(protonMessage.getDeliveryAnnotations(), decoded);
+
+      // Update the values
+      decoded.getValue().put(Symbol.valueOf(UUID.randomUUID().toString()), "test-2");
+
+      // Check that the message is unaffected.
+      assertDeliveryAnnotationsNotEquals(protonMessage.getDeliveryAnnotations(), decoded);
+   }
+
+   @Test
+   public void testGetMessageAnnotations() {
+      MessageImpl protonMessage = createProtonMessage();
+      AMQPMessage message = new AMQPMessage(0, encodeMessage(protonMessage), null, null);
+
+      MessageAnnotations decoded = message.getMessageAnnotations();
+      assertNotSame(decoded, protonMessage.getMessageAnnotations());
+      assertMessageAnnotationsEquals(protonMessage.getMessageAnnotations(), decoded);
+
+      // Update the values
+      decoded.getValue().put(Symbol.valueOf(UUID.randomUUID().toString()), "test");
+
+      // Check that the message is unaffected.
+      assertMessageAnnotationsNotEquals(protonMessage.getMessageAnnotations(), decoded);
+   }
+
+   @Test
+   public void testGetApplicationProperties() {
+      MessageImpl protonMessage = createProtonMessage();
+      AMQPMessage message = new AMQPMessage(0, encodeMessage(protonMessage), null, null);
+
+      ApplicationProperties decoded = message.getApplicationProperties();
+      assertNotSame(decoded, protonMessage.getApplicationProperties());
+      assertApplicationPropertiesEquals(protonMessage.getApplicationProperties(), decoded);
+
+      // Update the values
+      decoded.getValue().put(UUID.randomUUID().toString(), "test");
+
+      // Check that the message is unaffected.
+      assertApplicationPropertiesNotEquals(protonMessage.getApplicationProperties(), decoded);
+   }
+
+   @Test
+   public void testGetBody() {
+      MessageImpl protonMessage = createProtonMessage();
+      AMQPMessage message = new AMQPMessage(0, encodeMessage(protonMessage), null, null);
+
+      Object body = message.getBody();
+      assertTrue(body instanceof AmqpValue);
+      AmqpValue amqpValueBody = (AmqpValue) body;
+
+      assertNotNull(amqpValueBody.getValue());
+      assertNotSame(((AmqpValue)protonMessage.getBody()).getValue(), amqpValueBody.getValue());
+      assertEquals(((AmqpValue)protonMessage.getBody()).getValue(), amqpValueBody.getValue());
+   }
+
+   @SuppressWarnings("unchecked")
+   @Test
+   public void testGetFooter() {
+      MessageImpl protonMessage = createProtonMessage();
+      Footer footer = new Footer(new HashMap<>());
+      footer.getValue().put(Symbol.valueOf(UUID.randomUUID().toString()), "test-1");
+      protonMessage.setFooter(footer);
+
+      AMQPMessage message = new AMQPMessage(0, encodeMessage(protonMessage), null, null);
+
+      Footer decoded = message.getFooter();
+      assertNotSame(decoded, protonMessage.getFooter());
+      assertFootersEquals(protonMessage.getFooter(), decoded);
+
+      // Update the values
+      decoded.getValue().put(Symbol.valueOf(UUID.randomUUID().toString()), "test-2");
+
+      // Check that the message is unaffected.
+      assertFootersNotEquals(protonMessage.getFooter(), decoded);
+   }
+
+   //----- Test re-encode of updated message sections ------------------------//
+
+   @Test
+   public void testApplicationPropertiesReencodeAfterUpdate() {
+      MessageImpl protonMessage = createProtonMessage();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertProtonMessageEquals(protonMessage, decoded.getProtonMessage());
+
+      decoded.putStringProperty("key-2", "value-2");
+      decoded.reencode();
+
+      assertProtonMessageNotEquals(protonMessage, decoded.getProtonMessage());
+
+      assertEquals(decoded.getStringProperty(TEST_APPLICATION_PROPERTY_KEY), TEST_APPLICATION_PROPERTY_VALUE);
+      assertEquals(decoded.getStringProperty("key-2"), "value-2");
+   }
+
+   @Test
+   public void testMessageAnnotationsReencodeAfterUpdate() {
+      final SimpleString TEST_ANNOTATION = new SimpleString("testMessageAnnotationsReencodeAfterUpdate");
+
+      MessageImpl protonMessage = createProtonMessage();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertProtonMessageEquals(protonMessage, decoded.getProtonMessage());
+
+      decoded.setAnnotation(TEST_ANNOTATION, "value-2");
+      decoded.reencode();
+
+      assertProtonMessageNotEquals(protonMessage, decoded.getProtonMessage());
+
+      assertEquals(decoded.getAnnotation(TEST_ANNOTATION), "value-2");
+   }
+
+   //----- Test handling of message extra properties -------------------------//
+
+   @Test
+   public void testExtraByteProperty() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+
+      byte[] value = RandomUtil.randomBytes();
+      SimpleString name = SimpleString.toSimpleString("myProperty");
+
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertNull(decoded.getExtraProperties());
+      assertNull(decoded.getExtraBytesProperty(name));
+      assertNull(decoded.removeExtraBytesProperty(name));
+
+      decoded.putExtraBytesProperty(name, value);
+      assertFalse(decoded.getExtraProperties().isEmpty());
+
+      assertTrue(Arrays.equals(value, decoded.getExtraBytesProperty(name)));
+      assertTrue(Arrays.equals(value, decoded.removeExtraBytesProperty(name)));
+   }
+
+   @Test
+   public void testExtraProperty() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+
+      byte[] original = RandomUtil.randomBytes();
+      SimpleString name = SimpleString.toSimpleString("myProperty");
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+      decoded.setAddress("someAddress");
+      decoded.setMessageID(33);
+      decoded.putExtraBytesProperty(name, original);
+
+      ICoreMessage coreMessage = decoded.toCore();
+      Assert.assertEquals(original, coreMessage.getBytesProperty(name));
+
+      ActiveMQBuffer buffer = ActiveMQBuffers.pooledBuffer(10 * 1024);
+      try {
+         decoded.getPersister().encode(buffer, decoded);
+         Assert.assertEquals(AMQPMessagePersisterV2.getInstance().getID(), buffer.readByte()); // the journal reader will read 1 byte to find the persister
+         AMQPMessage readMessage = (AMQPMessage)decoded.getPersister().decode(buffer, null);
+         Assert.assertEquals(33, readMessage.getMessageID());
+         Assert.assertEquals("someAddress", readMessage.getAddress());
+         Assert.assertArrayEquals(original, readMessage.getExtraBytesProperty(name));
+      } finally {
+         buffer.release();
+      }
+
+      {
+         ICoreMessage embeddedMessage = EmbedMessageUtil.embedAsCoreMessage(decoded);
+         AMQPMessage readMessage = (AMQPMessage) EmbedMessageUtil.extractEmbedded(embeddedMessage);
+         Assert.assertEquals(33, readMessage.getMessageID());
+         Assert.assertEquals("someAddress", readMessage.getAddress());
+         Assert.assertArrayEquals(original, readMessage.getExtraBytesProperty(name));
+      }
+   }
+
+   //----- Test that message decode ignores unused sections ------------------//
+
+   private static final UnsignedLong AMQPVALUE_DESCRIPTOR = UnsignedLong.valueOf(0x0000000000000077L);
+   private static final UnsignedLong APPLICATION_PROPERTIES_DESCRIPTOR = UnsignedLong.valueOf(0x0000000000000074L);
+   private static final UnsignedLong DELIVERY_ANNOTATIONS_DESCRIPTOR = UnsignedLong.valueOf(0x0000000000000071L);
+
+   @Test
+   public void testPartialDecodeIgnoresDeliveryAnnotationsByDefault() {
+      Header header = new Header();
+      header.setDurable(true);
+      header.setPriority(UnsignedByte.valueOf((byte) 6));
+
+      ByteBuf encodedBytes = Unpooled.buffer(1024);
+      NettyWritable writable = new NettyWritable(encodedBytes);
+
+      EncoderImpl encoder = TLSEncode.getEncoder();
+      encoder.setByteBuffer(writable);
+      encoder.writeObject(header);
+
+      // Signal body of AmqpValue but write corrupt underlying type info
+      encodedBytes.writeByte(EncodingCodes.DESCRIBED_TYPE_INDICATOR);
+      encodedBytes.writeByte(EncodingCodes.SMALLULONG);
+      encodedBytes.writeByte(DELIVERY_ANNOTATIONS_DESCRIPTOR.byteValue());
+      encodedBytes.writeByte(EncodingCodes.MAP8);
+      encodedBytes.writeByte(2);  // Size
+      encodedBytes.writeByte(2);  // Elements
+      // Use bad encoding code on underlying type of map key which will fail the decode if run
+      encodedBytes.writeByte(255);
+
+      ReadableBuffer readable = new NettyReadable(encodedBytes);
+
+      AMQPMessage message = null;
+      try {
+         message = new AMQPMessage(0, readable, null, null);
+      } catch (Exception decodeError) {
+         fail("Should not have encountered an exception on partial decode: " + decodeError.getMessage());
+      }
+
+      try {
+         // This should perform the lazy decode of the DeliveryAnnotations portion of the message
+         message.getDeliveryAnnotations();
+         fail("Should have thrown an error when attempting to decode the ApplicationProperties which are malformed.");
+      } catch (Exception ex) {
+         // Expected decode to fail when building full message.
+      }
+   }
+
+   @Test
+   public void testPartialDecodeIgnoresApplicationPropertiesByDefault() {
+      Header header = new Header();
+      header.setDurable(true);
+      header.setPriority(UnsignedByte.valueOf((byte) 6));
+
+      ByteBuf encodedBytes = Unpooled.buffer(1024);
+      NettyWritable writable = new NettyWritable(encodedBytes);
+
+      EncoderImpl encoder = TLSEncode.getEncoder();
+      encoder.setByteBuffer(writable);
+      encoder.writeObject(header);
+
+      // Signal body of AmqpValue but write corrupt underlying type info
+      encodedBytes.writeByte(EncodingCodes.DESCRIBED_TYPE_INDICATOR);
+      encodedBytes.writeByte(EncodingCodes.SMALLULONG);
+      encodedBytes.writeByte(APPLICATION_PROPERTIES_DESCRIPTOR.byteValue());
+      encodedBytes.writeByte(EncodingCodes.MAP8);
+      encodedBytes.writeByte(2);  // Size
+      encodedBytes.writeByte(2);  // Elements
+      // Use bad encoding code on underlying type of map key which will fail the decode if run
+      encodedBytes.writeByte(255);
+
+      ReadableBuffer readable = new NettyReadable(encodedBytes);
+
+      AMQPMessage message = null;
+      try {
+         message = new AMQPMessage(0, readable, null, null);
+      } catch (Exception decodeError) {
+         fail("Should not have encountered an exception on partial decode: " + decodeError.getMessage());
+      }
+
+      assertTrue(message.isDurable());
+
+      try {
+         // This should perform the lazy decode of the ApplicationProperties portion of the message
+         message.getStringProperty("test");
+         fail("Should have thrown an error when attempting to decode the ApplicationProperties which are malformed.");
+      } catch (Exception ex) {
+         // Expected decode to fail when building full message.
+      }
+   }
+
+   @Test
+   public void testPartialDecodeIgnoresBodyByDefault() {
+      Header header = new Header();
+      header.setDurable(true);
+      header.setPriority(UnsignedByte.valueOf((byte) 6));
+
+      ByteBuf encodedBytes = Unpooled.buffer(1024);
+      NettyWritable writable = new NettyWritable(encodedBytes);
+
+      EncoderImpl encoder = TLSEncode.getEncoder();
+      encoder.setByteBuffer(writable);
+      encoder.writeObject(header);
+
+      // Signal body of AmqpValue but write corrupt underlying type info
+      encodedBytes.writeByte(EncodingCodes.DESCRIBED_TYPE_INDICATOR);
+      encodedBytes.writeByte(EncodingCodes.SMALLULONG);
+      encodedBytes.writeByte(AMQPVALUE_DESCRIPTOR.byteValue());
+      // Use bad encoding code on underlying type
+      encodedBytes.writeByte(255);
+
+      ReadableBuffer readable = new NettyReadable(encodedBytes);
+
+      AMQPMessage message = null;
+      try {
+         message = new AMQPMessage(0, readable, null, null);
+      } catch (Exception decodeError) {
+         fail("Should not have encountered an exception on partial decode: " + decodeError.getMessage());
+      }
+
+      assertTrue(message.isDurable());
+
+      try {
+         // This will decode the body section if present in order to present it as a Proton Message object
+         message.getBody();
+         fail("Should have thrown an error when attempting to decode the body which is malformed.");
+      } catch (Exception ex) {
+         // Expected decode to fail when building full message.
+      }
+   }
+
+   //----- Tests for message copy correctness --------------------------------//
+
+   @Test
+   public void testCopyMessage() {
+      AMQPMessage message = new AMQPMessage(0, encodedProtonMessage, null, null);
+      message.setMessageID(127);
+      AMQPMessage copy = (AMQPMessage) message.copy();
+
+      assertEquals(message.getMessageID(), copy.getMessageID());
+      assertProtonMessageEquals(message.getProtonMessage(), copy.getProtonMessage());
+   }
+
+   @Test
+   public void testCopyMessageWithNewArtemisMessageID() {
+      AMQPMessage message = new AMQPMessage(0, encodedProtonMessage, null, null);
+      message.setMessageID(127);
+      AMQPMessage copy = (AMQPMessage) message.copy(255);
+
+      assertNotEquals(message.getMessageID(), copy.getMessageID());
+      assertProtonMessageEquals(message.getProtonMessage(), copy.getProtonMessage());
+   }
+
+   @Test
+   public void testCopyMessageDoesNotRemovesMessageAnnotations() {
+      MessageImpl protonMessage = createProtonMessage();
+      DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(new HashMap<>());
+      deliveryAnnotations.getValue().put(Symbol.valueOf("testCopyMessageRemovesMessageAnnotations"), "1");
+      protonMessage.setDeliveryAnnotations(deliveryAnnotations);
+
+      AMQPMessage message = new AMQPMessage(0, encodeMessage(protonMessage), null, null);
+      message.setMessageID(127);
+      AMQPMessage copy = (AMQPMessage) message.copy();
+
+      assertEquals(message.getMessageID(), copy.getMessageID());
+      assertProtonMessageEquals(message.getProtonMessage(), copy.getProtonMessage());
+      assertNotNull(copy.getDeliveryAnnotations());
+   }
+
+   @Test
+   public void testDecodeCopyUpdateReencodeAndThenDecodeAgain() {
+      AMQPMessage message = new AMQPMessage(0, encodedProtonMessage, null, null);
+
+      // Sanity checks
+      assertTrue(message.isDurable());
+      assertEquals(TEST_STRING_BODY, ((AmqpValue) message.getBody()).getValue());
+
+      // Copy the message
+      message = (AMQPMessage) message.copy();
+
+      // Sanity checks
+      assertTrue(message.isDurable());
+      assertEquals(TEST_STRING_BODY, ((AmqpValue) message.getBody()).getValue());
+
+      // Update the message
+      message.setAnnotation(new SimpleString("x-opt-extra-1"), "test-1");
+      message.setAnnotation(new SimpleString("x-opt-extra-2"), "test-2");
+      message.setAnnotation(new SimpleString("x-opt-extra-3"), "test-3");
+
+      // Reencode and then decode the message again
+      message.reencode();
+
+      // Sanity checks
+      assertTrue(message.isDurable());
+      assertEquals(TEST_STRING_BODY, ((AmqpValue) message.getBody()).getValue());
+   }
+
+   //----- Test sendBuffer method --------------------------------------------//
+
+   @Test
+   public void testSendBuffer() {
+      ByteBuf buffer = Unpooled.buffer(255);
+      AMQPMessage message = new AMQPMessage(0, encodedProtonMessage, null, null);
+
+      message.sendBuffer(buffer, 1);
+
+      assertNotNull(buffer);
+
+      AMQPMessage copy = new AMQPMessage(0, new NettyReadable(buffer), null, null);
+
+      assertProtonMessageEquals(message.getProtonMessage(), copy.getProtonMessage());
+   }
+
+   //----- Test getSendBuffer variations -------------------------------------//
+
+   @Test
+   public void testGetSendBuffer() {
+      AMQPMessage message = new AMQPMessage(0, encodedProtonMessage, null, null);
+
+      ReadableBuffer buffer = message.getSendBuffer(1);
+      assertNotNull(buffer);
+      assertTrue(buffer.hasArray());
+
+      assertTrue(Arrays.equals(encodedProtonMessage, buffer.array()));
+
+      AMQPMessage copy = new AMQPMessage(0, buffer, null, null);
+
+      assertProtonMessageEquals(message.getProtonMessage(), copy.getProtonMessage());
+   }
+
+   @Test
+   public void testGetSendBufferAddsDeliveryCountOnlyToSendMessage() {
+      AMQPMessage message = new AMQPMessage(0, encodedProtonMessage, null, null);
+
+      ReadableBuffer buffer = message.getSendBuffer(7);
+      assertNotNull(buffer);
+      message.reencode(); // Ensures Header is current if accidentally updated
+
+      AMQPMessage copy = new AMQPMessage(0, buffer, null, null);
+
+      MessageImpl originalsProtonMessage = message.getProtonMessage();
+      MessageImpl copyProtonMessage = copy.getProtonMessage();
+      assertProtonMessageNotEquals(originalsProtonMessage, copyProtonMessage);
+
+      assertNull(originalsProtonMessage.getHeader().getDeliveryCount());
+      assertEquals(6, copyProtonMessage.getHeader().getDeliveryCount().intValue());
+   }
+
+   @Test
+   public void testGetSendBufferAddsDeliveryCountOnlyToSendMessageOriginalHadNoHeader() {
+      MessageImpl protonMessage = (MessageImpl) Proton.message();
+      AMQPMessage message = new AMQPMessage(0, encodeMessage(protonMessage), null, null);
+
+      ReadableBuffer buffer = message.getSendBuffer(7);
+      assertNotNull(buffer);
+      message.reencode(); // Ensures Header is current if accidentally updated
+
+      AMQPMessage copy = new AMQPMessage(0, buffer, null, null);
+
+      MessageImpl originalsProtonMessage = message.getProtonMessage();
+      MessageImpl copyProtonMessage = copy.getProtonMessage();
+      assertProtonMessageNotEquals(originalsProtonMessage, copyProtonMessage);
+
+      assertNull(originalsProtonMessage.getHeader());
+      assertEquals(6, copyProtonMessage.getHeader().getDeliveryCount().intValue());
+   }
+
+   @Test
+   public void testGetSendBufferRemoveDeliveryAnnotations() {
+      MessageImpl protonMessage = createProtonMessage();
+      DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(new HashMap<>());
+      deliveryAnnotations.getValue().put(Symbol.valueOf("testGetSendBufferRemoveDeliveryAnnotations"), "X");
+      protonMessage.setDeliveryAnnotations(deliveryAnnotations);
+      AMQPMessage message = new AMQPMessage(0, encodeMessage(protonMessage), null, null);
+
+      ReadableBuffer buffer = message.getSendBuffer(1);
+      assertNotNull(buffer);
+
+      AMQPMessage copy = new AMQPMessage(0, buffer, null, null);
+
+      MessageImpl copyProtonMessage = copy.getProtonMessage();
+      assertProtonMessageNotEquals(message.getProtonMessage(), copyProtonMessage);
+      assertNull(copyProtonMessage.getDeliveryAnnotations());
+   }
+
+   @Test
+   public void testGetSendBufferAddsDeliveryCountOnlyToSendMessageAndTrimsDeliveryAnnotations() {
+      MessageImpl protonMessage = createProtonMessage();
+      DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(new HashMap<>());
+      deliveryAnnotations.getValue().put(Symbol.valueOf("testGetSendBufferRemoveDeliveryAnnotations"), "X");
+      protonMessage.setDeliveryAnnotations(deliveryAnnotations);
+      AMQPMessage message = new AMQPMessage(0, encodeMessage(protonMessage), null, null);
+
+      ReadableBuffer buffer = message.getSendBuffer(7);
+      assertNotNull(buffer);
+      message.reencode(); // Ensures Header is current if accidentally updated
+
+      AMQPMessage copy = new AMQPMessage(0, buffer, null, null);
+
+      MessageImpl originalsProtonMessage = message.getProtonMessage();
+      MessageImpl copyProtonMessage = copy.getProtonMessage();
+      assertProtonMessageNotEquals(originalsProtonMessage, copyProtonMessage);
+
+      assertNull(originalsProtonMessage.getHeader().getDeliveryCount());
+      assertEquals(6, copyProtonMessage.getHeader().getDeliveryCount().intValue());
+      assertNull(copyProtonMessage.getDeliveryAnnotations());
+   }
+
+   //----- Test reencode method ----------------------------------------------//
+
+   @Test
+   public void testReencodeOnMessageWithNoPayoad() {
+      doTestMessageReencodeProducesEqualMessage(false, false, false, false, false, false, false);
+   }
+
+   @Test
+   public void testReencodeOnMessageWithFullPayoad() {
+      doTestMessageReencodeProducesEqualMessage(true, true, true, true, true, true, true);
+   }
+
+   @Test
+   public void testReencodeOnMessageWithHeadersOnly() {
+      doTestMessageReencodeProducesEqualMessage(true, false, false, false, false, false, false);
+   }
+
+   @Test
+   public void testReencodeOnMessageWithDeliveryAnnotationsOnly() {
+      doTestMessageReencodeProducesEqualMessage(false, true, false, false, false, false, false);
+   }
+
+   @Test
+   public void testReencodeOnMessageWithMessageAnnotationsOnly() {
+      doTestMessageReencodeProducesEqualMessage(false, false, true, false, false, false, false);
+   }
+
+   @Test
+   public void testReencodeOnMessageWithPropertiesOnly() {
+      doTestMessageReencodeProducesEqualMessage(false, false, false, true, false, false, false);
+   }
+
+   @Test
+   public void testReencodeOnMessageWithApplicationPropertiesOnly() {
+      doTestMessageReencodeProducesEqualMessage(false, false, false, false, true, false, false);
+   }
+
+   @Test
+   public void testReencodeOnMessageWithBodyOnly() {
+      doTestMessageReencodeProducesEqualMessage(false, false, false, false, false, true, false);
+   }
+
+   @Test
+   public void testReencodeOnMessageWithFooterOnly() {
+      doTestMessageReencodeProducesEqualMessage(false, false, false, false, false, false, true);
+   }
+
+   @Test
+   public void testReencodeOnMessageWithApplicationPropertiesAndBody() {
+      doTestMessageReencodeProducesEqualMessage(false, false, false, false, true, true, false);
+   }
+
+   @Test
+   public void testReencodeOnMessageWithApplicationPropertiesAndBodyAndFooter() {
+      doTestMessageReencodeProducesEqualMessage(false, false, false, false, true, true, true);
+   }
+
+   @Test
+   public void testReencodeOnMessageWithPropertiesApplicationPropertiesAndBodyAndFooter() {
+      doTestMessageReencodeProducesEqualMessage(false, false, false, true, true, true, true);
+   }
+
+   @Test
+   public void testReencodeOnMessageWithMessageAnnotationsPropertiesApplicationPropertiesAndBodyAndFooter() {
+      doTestMessageReencodeProducesEqualMessage(false, false, true, true, true, true, true);
+   }
+
+   @Test
+   public void testReencodeOnMessageWithDeliveryAnnotationsMessageAnnotationsPropertiesApplicationPropertiesBodyFooter() {
+      doTestMessageReencodeProducesEqualMessage(false, true, true, true, true, true, true);
+   }
+
+   @SuppressWarnings("unchecked")
+   private void doTestMessageReencodeProducesEqualMessage(
+      boolean header, boolean deliveryAnnotations, boolean messageAnnotations, boolean properties, boolean applicationProperties, boolean body, boolean footer) {
+
+      MessageImpl protonMessage = (MessageImpl) Proton.message();
+
+      if (header) {
+         Header headers = new Header();
+         headers.setDurable(true);
+         headers.setPriority(UnsignedByte.valueOf((byte) 9));
+         protonMessage.setHeader(headers);
+      }
+
+      if (properties) {
+         Properties props = new Properties();
+         props.setCreationTime(new Date(System.currentTimeMillis()));
+         props.setTo(TEST_TO_ADDRESS);
+         props.setMessageId(UUID.randomUUID());
+         protonMessage.setProperties(props);
+      }
+
+      if (deliveryAnnotations) {
+         DeliveryAnnotations annotations = new DeliveryAnnotations(new HashMap<>());
+         annotations.getValue().put(Symbol.valueOf(TEST_MESSAGE_ANNOTATION_KEY + "_DA"), TEST_MESSAGE_ANNOTATION_VALUE);
+         protonMessage.setDeliveryAnnotations(annotations);
+      }
+
+      if (messageAnnotations) {
+         MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
+         annotations.getValue().put(Symbol.valueOf(TEST_MESSAGE_ANNOTATION_KEY), TEST_MESSAGE_ANNOTATION_VALUE);
+         protonMessage.setMessageAnnotations(annotations);
+      }
+
+      if (applicationProperties) {
+         ApplicationProperties appProps = new ApplicationProperties(new HashMap<>());
+         appProps.getValue().put(TEST_APPLICATION_PROPERTY_KEY, TEST_APPLICATION_PROPERTY_VALUE);
+         protonMessage.setApplicationProperties(appProps);
+      }
+
+      if (body) {
+         AmqpValue text = new AmqpValue(TEST_STRING_BODY);
+         protonMessage.setBody(text);
+      }
+
+      if (footer) {
+         Footer foot = new Footer(new HashMap<>());
+         foot.getValue().put(Symbol.valueOf(TEST_MESSAGE_ANNOTATION_KEY + "_FOOT"), TEST_MESSAGE_ANNOTATION_VALUE);
+         protonMessage.setFooter(foot);
+      }
+
+      AMQPMessage message = new AMQPMessage(0, encodeMessage(protonMessage), null, null);
+
+      message.reencode();
+
+      if (deliveryAnnotations) {
+         assertProtonMessageNotEquals(protonMessage, message.getProtonMessage());
+      } else {
+         assertProtonMessageEquals(protonMessage, message.getProtonMessage());
+      }
+   }
+
+   //----- Test Support ------------------------------------------------------//
+
+   private MessageImpl createProtonMessage() {
+      MessageImpl message = (MessageImpl) Proton.message();
+
+      Header header = new Header();
+      header.setDurable(true);
+      header.setPriority(UnsignedByte.valueOf((byte) 9));
+
+      Properties properties = new Properties();
+      properties.setCreationTime(new Date(System.currentTimeMillis()));
+      properties.setTo(TEST_TO_ADDRESS);
+      properties.setMessageId(UUID.randomUUID());
+
+      MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
+      annotations.getValue().put(Symbol.valueOf(TEST_MESSAGE_ANNOTATION_KEY), TEST_MESSAGE_ANNOTATION_VALUE);
+
+      ApplicationProperties applicationProperties = new ApplicationProperties(new HashMap<>());
+      applicationProperties.getValue().put(TEST_APPLICATION_PROPERTY_KEY, TEST_APPLICATION_PROPERTY_VALUE);
+
+      AmqpValue body = new AmqpValue(TEST_STRING_BODY);
+
+      message.setHeader(header);
+      message.setMessageAnnotations(annotations);
+      message.setProperties(properties);
+      message.setApplicationProperties(applicationProperties);
+      message.setBody(body);
+
+      return message;
+   }
+
+   private void assertProtonMessageEquals(MessageImpl left, MessageImpl right) {
+      if (!isEquals(left, right)) {
+         fail("MessageImpl values should be equal: left{" + left + "} right{" + right + "}");
+      }
+   }
+
+   private void assertProtonMessageNotEquals(MessageImpl left, MessageImpl right) {
+      if (isEquals(left, right)) {
+         fail("MessageImpl values should be equal: left{" + left + "} right{" + right + "}");
+      }
+   }
+
+   private boolean isEquals(MessageImpl left, MessageImpl right) {
+      if (left == null && right == null) {
+         return true;
+      }
+      if (!isNullnessEquals(left, right)) {
+         return false;
+      }
+
+      try {
+         assertHeaderEquals(left.getHeader(), right.getHeader());
+         assertDeliveryAnnotationsEquals(left.getDeliveryAnnotations(), right.getDeliveryAnnotations());
+         assertMessageAnnotationsEquals(left.getMessageAnnotations(), right.getMessageAnnotations());
+         assertPropertiesEquals(left.getProperties(), right.getProperties());
+         assertApplicationPropertiesEquals(left.getApplicationProperties(), right.getApplicationProperties());
+         assertTrue(isEquals(left.getBody(), right.getBody()));
+         assertFootersEquals(left.getFooter(), right.getFooter());
+      } catch (Throwable e) {
+         return false;
+      }
+
+      return true;
+   }
+
+   private void assertHeaderEquals(Header left, Header right) {
+      if (!isEquals(left, right)) {
+         fail("Header values should be equal: left{" + left + "} right{" + right + "}");
+      }
+   }
+
+   private void assertHeaderNotEquals(Header left, Header right) {
+      if (isEquals(left, right)) {
+         fail("Header values should not be equal: left{" + left + "} right{" + right + "}");
+      }
+   }
+
+   private boolean isEquals(Header left, Header right) {
+      if (left == null && right == null) {
+         return true;
+      }
+      if (!isNullnessEquals(left, right)) {
+         return false;
+      }
+
+      try {
+         assertEquals(left.getDurable(), right.getDurable());
+         assertEquals(left.getDeliveryCount(), right.getDeliveryCount());
+         assertEquals(left.getFirstAcquirer(), right.getFirstAcquirer());
+         assertEquals(left.getPriority(), right.getPriority());
+         assertEquals(left.getTtl(), right.getTtl());
+      } catch (Throwable e) {
+         return false;
+      }
+
+      return true;
+   }
+
+   private void assertPropertiesEquals(Properties left, Properties right) {
+      if (!isEquals(left, right)) {
+         fail("Properties values should be equal: left{" + left + "} right{" + right + "}");
+      }
+   }
+
+   private void assertPropertiesNotEquals(Properties left, Properties right) {
+      if (isEquals(left, right)) {
+         fail("Properties values should not be equal: left{" + left + "} right{" + right + "}");
+      }
+   }
+
+   private boolean isEquals(Properties left, Properties right) {
+      if (left == null && right == null) {
+         return true;
+      }
+      if (!isNullnessEquals(left, right)) {
+         return false;
+      }
+
+      try {
+         assertEquals(left.getAbsoluteExpiryTime(), right.getAbsoluteExpiryTime());
+         assertEquals(left.getContentEncoding(), right.getAbsoluteExpiryTime());
+         assertEquals(left.getContentType(), right.getContentType());
+         assertEquals(left.getCorrelationId(), right.getCorrelationId());
+         assertEquals(left.getCreationTime(), right.getCreationTime());
+         assertEquals(left.getGroupId(), right.getGroupId());
+         assertEquals(left.getGroupSequence(), right.getGroupSequence());
+         assertEquals(left.getMessageId(), right.getMessageId());
+         assertEquals(left.getReplyTo(), right.getReplyTo());
+         assertEquals(left.getReplyToGroupId(), right.getReplyToGroupId());
+         assertEquals(left.getSubject(), right.getSubject());
+         assertEquals(left.getUserId(), right.getUserId());
+         assertEquals(left.getTo(), right.getTo());
+      } catch (Throwable e) {
+         return false;
+      }
+
+      return true;
+   }
+
+   private void assertMessageAnnotationsEquals(MessageAnnotations left, MessageAnnotations right) {
+      if (!isEquals(left, right)) {
+         fail("MessageAnnotations values should be equal: left{" + left + "} right{" + right + "}");
+      }
+   }
+
+   private void assertMessageAnnotationsNotEquals(MessageAnnotations left, MessageAnnotations right) {
+      if (isEquals(left, right)) {
+         fail("MessageAnnotations values should not be equal: left{" + left + "} right{" + right + "}");
+      }
+   }
+
+   private boolean isEquals(MessageAnnotations left, MessageAnnotations right) {
+      if (left == null && right == null) {
+         return true;
+      }
+      if (!isNullnessEquals(left, right)) {
+         return false;
+      }
+
+      return isEquals(left.getValue(), right.getValue());
+   }
+
+   private void assertDeliveryAnnotationsEquals(DeliveryAnnotations left, DeliveryAnnotations right) {
+      if (!isEquals(left, right)) {
+         fail("DeliveryAnnotations values should be equal: left{" + left + "} right{" + right + "}");
+      }
+   }
+
+   private void assertDeliveryAnnotationsNotEquals(DeliveryAnnotations left, DeliveryAnnotations right) {
+      if (isEquals(left, right)) {
+         fail("DeliveryAnnotations values should not be equal: left{" + left + "} right{" + right + "}");
+      }
+   }
+
+   private boolean isEquals(DeliveryAnnotations left, DeliveryAnnotations right) {
+      if (left == null && right == null) {
+         return true;
+      }
+      if (!isNullnessEquals(left, right)) {
+         return false;
+      }
+
+      return isEquals(left.getValue(), right.getValue());
+   }
+
+   private void assertApplicationPropertiesEquals(ApplicationProperties left, ApplicationProperties right) {
+      if (!isEquals(left, right)) {
+         fail("ApplicationProperties values should be equal: left{" + left + "} right{" + right + "}");
+      }
+   }
+
+   private void assertApplicationPropertiesNotEquals(ApplicationProperties left, ApplicationProperties right) {
+      if (isEquals(left, right)) {
+         fail("ApplicationProperties values should not be equal: left{" + left + "} right{" + right + "}");
+      }
+   }
+
+   private boolean isEquals(ApplicationProperties left, ApplicationProperties right) {
+      if (left == null && right == null) {
+         return true;
+      }
+      if (!isNullnessEquals(left, right)) {
+         return false;
+      }
+
+      return isEquals(left.getValue(), right.getValue());
+   }
+
+   private void assertFootersEquals(Footer left, Footer right) {
+      if (!isEquals(left, right)) {
+         fail("Footer values should be equal: left{" + left + "} right{" + right + "}");
+      }
+   }
+
+   private void assertFootersNotEquals(Footer left, Footer right) {
+      if (isEquals(left, right)) {
+         fail("Footer values should not be equal: left{" + left + "} right{" + right + "}");
+      }
+   }
+
+   private boolean isEquals(Footer left, Footer right) {
+      if (left == null && right == null) {
+         return true;
+      }
+      if (!isNullnessEquals(left, right)) {
+         return false;
+      }
+
+      return isEquals(left.getValue(), right.getValue());
+   }
+
+   private boolean isEquals(Map<?, ?> left, Map<?, ?> right) {
+      if (left == null && right == null) {
+         return true;
+      }
+
+      if (!isNullnessEquals(left, right)) {
+         return false;
+      }
+
+      if (left.size() != right.size()) {
+         return false;
+      }
+
+      for (Object leftKey : left.keySet()) {
+         assertEquals(right.get(leftKey), left.get(leftKey));
+      }
+
+      for (Object rightKey : right.keySet()) {
+         assertEquals(left.get(rightKey), right.get(rightKey));
+      }
+
+      return true;
+   }
+
+   @SuppressWarnings("unchecked")
+   private boolean isEquals(Section left, Section right) {
+      if (left == null && right == null) {
+         return true;
+      }
+      if (!isNullnessEquals(left, right)) {
+         return false;
+      }
+
+      assertTrue(left.getClass().equals(right.getClass()));
+
+      if (left instanceof AmqpValue) {
+         AmqpValue leftValue = (AmqpValue) left;
+         AmqpValue rightValue = (AmqpValue) right;
+
+         if (leftValue.getValue() == null && rightValue.getValue() == null) {
+            return true;
+         }
+         if (!isNullnessEquals(leftValue.getValue(), rightValue.getValue())) {
+            return false;
+         }
+
+         assertEquals(leftValue.getValue(), rightValue.getValue());
+      } else if (left instanceof AmqpSequence) {
+         AmqpSequence leftValue = (AmqpSequence) left;
+         AmqpSequence rightValue = (AmqpSequence) right;
+
+         if (leftValue.getValue() == null && rightValue.getValue() == null) {
+            return true;
+         }
+         if (!isNullnessEquals(leftValue.getValue(), rightValue.getValue())) {
+            return false;
+         }
+
+         List<Object> leftList = leftValue.getValue();
+         List<Object> rightList = leftValue.getValue();
+
+         assertEquals(leftList.size(), rightList.size());
+
+         for (int i = 0; i < leftList.size(); ++i) {
+            assertEquals(leftList.get(i), rightList.get(i));
+         }
+      } else if (left instanceof Data) {
+         Data leftValue = (Data) left;
+         Data rightValue = (Data) right;
+
+         if (leftValue.getValue() == null && rightValue.getValue() == null) {
+            return true;
+         }
+         if (!isNullnessEquals(leftValue.getValue(), rightValue.getValue())) {
+            return false;
+         }
+
+         byte[] leftArray = leftValue.getValue().getArray();
+         byte[] rightArray = rightValue.getValue().getArray();
+
+         if (leftArray == null && leftArray == null) {
+            return true;
+         }
+         if (!isNullnessEquals(leftArray, rightArray)) {
+            return false;
+         }
+
+         assertTrue(Arrays.equals(leftArray, rightArray));
+      } else {
+         return false;
+      }
+
+      return true;
+   }
+
+   private boolean isNullnessEquals(Object left, Object right) {
+      if (left == null && right != null) {
+         return false;
+      }
+      if (left != null && right == null) {
+         return false;
+      }
+
+      return true;
+   }
+
+   private ActiveMQBuffer encodeMessageAsPersistedBuffer(MessageImpl message) {
+      ByteBuf nettyBuffer = Unpooled.buffer(1500);
+
+      message.encode(new NettyWritable(nettyBuffer));
+      byte[] bytes = new byte[nettyBuffer.writerIndex() + Integer.BYTES];
+      nettyBuffer.readBytes(bytes, Integer.BYTES, nettyBuffer.readableBytes());
+
+      ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(bytes);
+      buffer.writerIndex(0);
+      buffer.writeInt(bytes.length - Integer.BYTES);
+      buffer.setIndex(0, bytes.length);
+
+      return buffer;
+   }
+
+   private byte[] encodeMessage(MessageImpl message) {
+      ByteBuf nettyBuffer = Unpooled.buffer(1500);
+
+      message.encode(new NettyWritable(nettyBuffer));
+      byte[] bytes = new byte[nettyBuffer.writerIndex()];
+      nettyBuffer.readBytes(bytes);
+
+      return bytes;
+   }
+
+   private AMQPMessage encodeAndDecodeMessage(MessageImpl message) {
+      ByteBuf nettyBuffer = Unpooled.buffer(1500);
+
+      message.encode(new NettyWritable(nettyBuffer));
+      byte[] bytes = new byte[nettyBuffer.writerIndex()];
+      nettyBuffer.readBytes(bytes);
+
+      return new AMQPMessage(0, bytes, null);
+   }
+}