You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/09/26 13:19:49 UTC

[2/5] activemq-artemis git commit: ARTEMIS-2096 Refactor AMQMessage abstraction

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a851a8f9/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
new file mode 100644
index 0000000..1e5d3d1
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
@@ -0,0 +1,2231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.broker;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
+import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
+import org.apache.activemq.artemis.reader.MessageUtil;
+import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.activemq.artemis.utils.collections.TypedProperties;
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedByte;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.apache.qpid.proton.amqp.UnsignedLong;
+import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Footer;
+import org.apache.qpid.proton.amqp.messaging.Header;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Properties;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.codec.EncoderImpl;
+import org.apache.qpid.proton.codec.EncodingCodes;
+import org.apache.qpid.proton.codec.ReadableBuffer;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.message.impl.MessageImpl;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+public class AMQPMessageTest {
+
+   private static final String TEST_TO_ADDRESS = "someAddress";
+
+   private static final String TEST_MESSAGE_ANNOTATION_KEY = "x-opt-test-annotation";
+   private static final String TEST_MESSAGE_ANNOTATION_VALUE = "test-annotation";
+
+   private static final String TEST_APPLICATION_PROPERTY_KEY = "key-1";
+   private static final String TEST_APPLICATION_PROPERTY_VALUE = "value-1";
+
+   private static final String TEST_STRING_BODY = "test-string-body";
+
+   private byte[] encodedProtonMessage;
+
+   @Before
+   public void setUp() {
+      encodedProtonMessage = encodeMessage(createProtonMessage());
+   }
+
+   //----- Test Message Creation ---------------------------------------------//
+
+   @Test
+   public void testCreateMessageFromEncodedByteArrayData() {
+      // Constructor 1
+      AMQPMessage decoded = new AMQPMessage(0, encodedProtonMessage, null);
+
+      assertTrue(decoded.isDurable());
+      assertEquals(TEST_TO_ADDRESS, decoded.getAddress());
+
+      // Constructor 2
+      decoded = new AMQPMessage(0, encodedProtonMessage, null, null);
+
+      assertTrue(decoded.isDurable());
+      assertEquals(TEST_TO_ADDRESS, decoded.getAddress());
+   }
+
+   @Test
+   public void testCreateMessageFromEncodedReadableBuffer() {
+      AMQPMessage decoded = new AMQPMessage(0, ReadableBuffer.ByteBufferReader.wrap(encodedProtonMessage), null, null);
+
+      assertEquals(true, decoded.getHeader().getDurable());
+      assertEquals(TEST_TO_ADDRESS, decoded.getAddress());
+   }
+
+   @Test
+   public void testCreateMessageFromEncodedByteArrayDataWithExtraProperties() {
+      AMQPMessage decoded = new AMQPMessage(0, encodedProtonMessage, new TypedProperties(), null);
+
+      assertEquals(true, decoded.getHeader().getDurable());
+      assertEquals(TEST_TO_ADDRESS, decoded.getAddress());
+      assertNotNull(decoded.getExtraProperties());
+   }
+
+   @Test
+   public void testCreateMessageForPersistenceDataReload() throws ActiveMQException {
+      MessageImpl protonMessage = createProtonMessage();
+      ActiveMQBuffer encoded = encodeMessageAsPersistedBuffer(protonMessage);
+
+      AMQPMessage message = new AMQPMessage(0);
+      try {
+         message.getProtonMessage();
+         fail("Should throw NPE due to not being initialized yet");
+      } catch (NullPointerException npe) {
+      }
+
+      final long persistedSize = (long) encoded.readableBytes();
+
+      // Now reload from encoded data
+      message.reloadPersistence(encoded);
+
+      assertEquals(persistedSize, message.getPersistSize());
+      assertEquals(persistedSize - Integer.BYTES, message.getPersistentSize());
+      assertEquals(persistedSize - Integer.BYTES, message.getEncodeSize());
+      assertEquals(true, message.getHeader().getDurable());
+      assertEquals(TEST_TO_ADDRESS, message.getAddress());
+   }
+
+   //----- Test Memory Estimate access ---------------------------------------//
+
+   @Test
+   public void testGetMemoryEstimate() {
+      AMQPMessage decoded = new AMQPMessage(0, encodedProtonMessage, new TypedProperties(), null);
+
+      int estimate = decoded.getMemoryEstimate();
+      assertTrue(encodedProtonMessage.length < decoded.getMemoryEstimate());
+      assertEquals(estimate, decoded.getMemoryEstimate());
+
+      decoded.putStringProperty(new SimpleString("newProperty"), "newValue");
+      decoded.reencode();
+
+      assertNotEquals(estimate, decoded.getMemoryEstimate());
+   }
+
+   //----- Test Connection ID access -----------------------------------------//
+
+   @Test
+   public void testGetConnectionID() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(null, decoded.getConnectionID());
+   }
+
+   @Test
+   public void testSetConnectionID() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      final String ID = UUID.randomUUID().toString();
+
+      assertEquals(null, decoded.getConnectionID());
+      decoded.setConnectionID(ID);
+      assertEquals(ID, decoded.getConnectionID());
+   }
+
+   @Test
+   public void testGetConnectionIDFromProperties() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      final String ID = UUID.randomUUID().toString();
+
+      assertEquals(null, decoded.getConnectionID());
+      decoded.setConnectionID(ID);
+      assertEquals(ID, decoded.getConnectionID());
+      assertEquals(ID, decoded.getStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME));
+   }
+
+   //----- Test LastValueProperty access -------------------------------//
+
+   @Test
+   public void testGetLastValueFromMessageWithNone() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertNull(decoded.getLastValueProperty());
+   }
+
+   @Test
+   public void testSetLastValueFromMessageWithNone() {
+      SimpleString lastValue = new SimpleString("last-address");
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertNull(decoded.getLastValueProperty());
+      decoded.setLastValueProperty(lastValue);
+      assertEquals(lastValue, decoded.getLastValueProperty());
+   }
+
+   //----- Test User ID access -----------------------------------------//
+
+   @Test
+   public void getUserIDWhenNoPropertiesExists() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertNull(decoded.getUserID());
+      decoded.setUserID(UUID.randomUUID().toString());
+      assertNull(decoded.getUserID());
+   }
+
+   @Test
+   public void testSetUserIDHasNoEffectOnMessagePropertiesWhenNotPresent() {
+      final String ID = UUID.randomUUID().toString();
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertNull(decoded.getUserID());
+      assertNull(decoded.getProperties());
+
+      decoded.setUserID(ID);
+      decoded.reencode();
+
+      assertNull(decoded.getUserID());
+      assertNull(decoded.getProperties());
+   }
+
+   @Test
+   public void testSetUserIDHasNoEffectOnMessagePropertiesWhenPresentButNoMessageID() {
+      final String ID = UUID.randomUUID().toString();
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setProperties(new Properties());
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertNull(decoded.getUserID());
+      assertNotNull(decoded.getProperties());
+      assertNull(decoded.getProperties().getMessageId());
+
+      decoded.setUserID(ID);
+      decoded.reencode();
+
+      assertNull(decoded.getUserID());
+      assertNotNull(decoded.getProperties());
+      assertNull(decoded.getProperties().getMessageId());
+   }
+
+   @Test
+   public void testSetUserIDHasNoEffectOnMessagePropertiesWhenPresentWithMessageID() {
+      final String ID = UUID.randomUUID().toString();
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setProperties(new Properties());
+      protonMessage.setMessageId(ID);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertNotNull(decoded.getUserID());
+      assertNotNull(decoded.getProperties());
+      assertNotNull(decoded.getProperties().getMessageId());
+      assertEquals(ID, decoded.getUserID());
+
+      decoded.setUserID(ID);
+      decoded.reencode();
+
+      assertNotNull(decoded.getUserID());
+      assertNotNull(decoded.getProperties());
+      assertNotNull(decoded.getProperties().getMessageId());
+      assertEquals(ID, decoded.getUserID());
+   }
+
+   //----- Test the getDuplicateProperty methods -----------------------------//
+
+   @Test
+   public void testGetDuplicateProperty() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(null, decoded.getDuplicateProperty());
+   }
+
+   //----- Test the getAddress methods ---------------------------------------//
+
+   @Test
+   public void testGetAddressFromMessage() {
+      final String ADDRESS = "myQueue";
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader(new Header());
+      protonMessage.setAddress(ADDRESS);
+
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(ADDRESS, decoded.getAddress());
+   }
+
+   @Test
+   public void testGetAddressSimpleStringFromMessage() {
+      final String ADDRESS = "myQueue";
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader(new Header());
+      protonMessage.setAddress(ADDRESS);
+
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(ADDRESS, decoded.getAddressSimpleString().toString());
+   }
+
+   @Test
+   public void testGetAddressFromMessageWithNoValueSet() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertNull(decoded.getAddress());
+      assertNull(decoded.getAddressSimpleString());
+   }
+
+   @Test
+   public void testSetAddressFromMessage() {
+      final String ADDRESS = "myQueue";
+      final SimpleString NEW_ADDRESS = new SimpleString("myQueue-1");
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setAddress(ADDRESS);
+
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(ADDRESS, decoded.getAddress());
+      decoded.setAddress(NEW_ADDRESS);
+      assertEquals(NEW_ADDRESS, decoded.getAddressSimpleString());
+   }
+
+   @Test
+   public void testSetAddressFromMessageUpdatesPropertiesOnReencode() {
+      final String ADDRESS = "myQueue";
+      final SimpleString NEW_ADDRESS = new SimpleString("myQueue-1");
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setAddress(ADDRESS);
+
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(ADDRESS, decoded.getAddress());
+      decoded.setAddress(NEW_ADDRESS);
+      decoded.reencode();
+
+      assertEquals(NEW_ADDRESS.toString(), decoded.getProperties().getTo());
+      assertEquals(NEW_ADDRESS, decoded.getAddressSimpleString());
+   }
+
+   //----- Test the durability set and get methods ---------------------------//
+
+   @Test
+   public void testIsDurableFromMessageWithHeaderTaggedAsTrue() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader(new Header());
+      protonMessage.setDurable(true);
+
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+      assertTrue(decoded.isDurable());
+   }
+
+   @Test
+   public void testIsDurableFromMessageWithHeaderTaggedAsFalse() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader(new Header());
+      protonMessage.setDurable(false);
+
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+      assertFalse(decoded.isDurable());
+   }
+
+   @Test
+   public void testIsDurableFromMessageWithNoValueSet() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+      assertFalse(decoded.isDurable());
+   }
+
+   @Test
+   public void testIsDuranleReturnsTrueOnceUpdated() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+      assertFalse(decoded.isDurable());
+      decoded.setDurable(true);
+      assertTrue(decoded.isDurable());
+   }
+
+   @Test
+   public void testNonDurableMessageReencodedToDurable() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader(new Header());
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+      assertFalse(decoded.isDurable());
+
+      // Underlying message data not updated yet
+      assertNull(decoded.getHeader().getDurable());
+
+      decoded.setDurable(true);
+      decoded.reencode();
+      assertTrue(decoded.isDurable());
+
+      // Underlying message data now updated
+      assertTrue(decoded.getHeader().getDurable());
+   }
+
+   @Test
+   public void testMessageWithNoHeaderGetsOneWhenDurableSetAndReencoded() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+      assertFalse(decoded.isDurable());
+
+      // Underlying message data not updated yet
+      assertNull(decoded.getHeader());
+
+      decoded.setDurable(true);
+      decoded.reencode();
+      assertTrue(decoded.isDurable());
+
+      // Underlying message data now updated
+      Header header = decoded.getHeader();
+      assertNotNull(header);
+      assertTrue(header.getDurable());
+   }
+
+   //----- Test RoutingType access -------------------------------------------//
+
+   @Test
+   public void testGetRoutingTypeFromMessageWithoutIt() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertNull(decoded.getRoutingType());
+   }
+
+   @Test
+   public void testSetRoutingType() {
+      RoutingType type = RoutingType.ANYCAST;
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertNull(decoded.getRoutingType());
+      decoded.setRoutingType(type);
+      assertEquals(type, decoded.getRoutingType());
+   }
+
+   @Test
+   public void testSetRoutingTypeToClear() {
+      RoutingType type = RoutingType.ANYCAST;
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertNull(decoded.getRoutingType());
+      decoded.setRoutingType(type);
+      assertEquals(type, decoded.getRoutingType());
+      decoded.setRoutingType(null);
+      assertNull(decoded.getRoutingType());
+   }
+
+   @Test
+   public void testRemoveRoutingTypeFromMessageEncodedWithOne() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
+      annotations.getValue().put(AMQPMessageSupport.ROUTING_TYPE, RoutingType.ANYCAST.getType());
+      protonMessage.setMessageAnnotations(annotations);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(RoutingType.ANYCAST, decoded.getRoutingType());
+      decoded.setRoutingType(null);
+      decoded.reencode();
+      assertNull(decoded.getRoutingType());
+
+      assertTrue(decoded.getMessageAnnotations().getValue().isEmpty());
+   }
+
+   @Test
+   public void testGetRoutingTypeFromMessageWithAnyCastType() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
+      annotations.getValue().put(AMQPMessageSupport.ROUTING_TYPE, RoutingType.ANYCAST.getType());
+      protonMessage.setMessageAnnotations(annotations);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(RoutingType.ANYCAST, decoded.getRoutingType());
+   }
+
+   @Test
+   public void testGetRoutingTypeFromMessageWithMulticastType() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
+      annotations.getValue().put(AMQPMessageSupport.ROUTING_TYPE, RoutingType.MULTICAST.getType());
+      protonMessage.setMessageAnnotations(annotations);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(RoutingType.MULTICAST, decoded.getRoutingType());
+   }
+
+   @Test
+   public void testGetRoutingTypeFromMessageWithQueueType() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
+      annotations.getValue().put(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION, AMQPMessageSupport.QUEUE_TYPE);
+      protonMessage.setMessageAnnotations(annotations);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(RoutingType.ANYCAST, decoded.getRoutingType());
+   }
+
+   @Test
+   public void testGetRoutingTypeFromMessageWithTempQueueType() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
+      annotations.getValue().put(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION, AMQPMessageSupport.TEMP_QUEUE_TYPE);
+      protonMessage.setMessageAnnotations(annotations);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(RoutingType.ANYCAST, decoded.getRoutingType());
+   }
+
+   @Test
+   public void testGetRoutingTypeFromMessageWithTopicType() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
+      annotations.getValue().put(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION, AMQPMessageSupport.TOPIC_TYPE);
+      protonMessage.setMessageAnnotations(annotations);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(RoutingType.MULTICAST, decoded.getRoutingType());
+   }
+
+   @Test
+   public void testGetRoutingTypeFromMessageWithTempTopicType() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
+      annotations.getValue().put(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION, AMQPMessageSupport.TEMP_TOPIC_TYPE);
+      protonMessage.setMessageAnnotations(annotations);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(RoutingType.MULTICAST, decoded.getRoutingType());
+   }
+
+   @Test
+   public void testGetRoutingTypeFromMessageWithUnknownType() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
+      annotations.getValue().put(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION, (byte) 32);
+      protonMessage.setMessageAnnotations(annotations);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertNull(decoded.getRoutingType());
+   }
+
+   //----- Test access to message Group ID -----------------------------------//
+
+   @Test
+   public void testGetGroupIDFromMessage() {
+      final String GROUP_ID = "group-1";
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader(new Header());
+      protonMessage.setGroupId(GROUP_ID);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(GROUP_ID, decoded.getGroupID().toString());
+   }
+
+   @Test
+   public void testGetGroupIDFromMessageWithNoGroupId() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setProperties(new Properties());
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+      assertNull(decoded.getGroupID());
+   }
+
+   @Test
+   public void testGetGroupIDFromMessageWithNoProperties() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+      assertNull(decoded.getGroupID());
+   }
+
+   //----- Test access to message Group ID -----------------------------------//
+
+   @Test
+   public void testGetReplyToFromMessage() {
+      final String REPLY_TO = "address-1";
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader(new Header());
+      protonMessage.setReplyTo(REPLY_TO);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(REPLY_TO, decoded.getReplyTo().toString());
+   }
+
+   @Test
+   public void testGetReplyToFromMessageWithNoReplyTo() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setProperties(new Properties());
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+      assertNull(decoded.getReplyTo());
+   }
+
+   @Test
+   public void testGetReplyToFromMessageWithNoProperties() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+      assertNull(decoded.getReplyTo());
+   }
+
+   @Test
+   public void testSetReplyToFromMessageWithProperties() {
+      final String REPLY_TO = "address-1";
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setProperties(new Properties());
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+      assertNull(decoded.getReplyTo());
+
+      decoded.setReplyTo(new SimpleString(REPLY_TO));
+      decoded.reencode();
+
+      assertEquals(REPLY_TO, decoded.getReplyTo().toString());
+      assertEquals(REPLY_TO, decoded.getProperties().getReplyTo());
+   }
+
+   @Test
+   public void testSetReplyToFromMessageWithNoProperties() {
+      final String REPLY_TO = "address-1";
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+      assertNull(decoded.getReplyTo());
+
+      decoded.setReplyTo(new SimpleString(REPLY_TO));
+      decoded.reencode();
+
+      assertEquals(REPLY_TO, decoded.getReplyTo().toString());
+      assertEquals(REPLY_TO, decoded.getProperties().getReplyTo());
+   }
+
+   @Test
+   public void testSetReplyToFromMessageWithPropertiesCanClear() {
+      final String REPLY_TO = "address-1";
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setProperties(new Properties());
+      protonMessage.setReplyTo(REPLY_TO);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+      assertEquals(REPLY_TO, decoded.getReplyTo().toString());
+
+      decoded.setReplyTo(null);
+      decoded.reencode();
+
+      assertEquals(null, decoded.getReplyTo());
+      assertEquals(null, decoded.getProperties().getReplyTo());
+   }
+
+   //----- Test access to User ID --------------------------------------------//
+
+   @Test
+   public void testGetUserIDFromMessage() {
+      final String USER_NAME = "foo";
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setUserId(USER_NAME.getBytes(StandardCharsets.UTF_8));
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(USER_NAME, decoded.getAMQPUserID());
+   }
+
+   @Test
+   public void testGetUserIDFromMessageWithNoProperties() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertNull(decoded.getAMQPUserID());
+   }
+
+   @Test
+   public void testGetUserIDFromMessageWithNoUserID() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setProperties(new Properties());
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertNull(decoded.getAMQPUserID());
+   }
+
+   //----- Test access message priority --------------------------------------//
+
+   @Test
+   public void testGetPriorityFromMessage() {
+      final short PRIORITY = 7;
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader(new Header());
+      protonMessage.setPriority(PRIORITY);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(PRIORITY, decoded.getPriority());
+   }
+
+   @Test
+   public void testGetPriorityFromMessageWithNoHeader() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(AMQPMessage.DEFAULT_MESSAGE_PRIORITY, decoded.getPriority());
+   }
+
+   @Test
+   public void testGetPriorityFromMessageWithNoPrioritySet() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader(new Header());
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(AMQPMessage.DEFAULT_MESSAGE_PRIORITY, decoded.getPriority());
+   }
+
+   @Test
+   public void testSetPriorityOnMessageWithHeader() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader(new Header());
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(AMQPMessage.DEFAULT_MESSAGE_PRIORITY, decoded.getPriority());
+
+      decoded.setPriority((byte) 9);
+      decoded.reencode();
+
+      assertEquals(9, decoded.getPriority());
+      assertEquals(9, decoded.getHeader().getPriority().byteValue());
+   }
+
+   @Test
+   public void testSetPriorityOnMessageWithoutHeader() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(AMQPMessage.DEFAULT_MESSAGE_PRIORITY, decoded.getPriority());
+
+      decoded.setPriority((byte) 9);
+      decoded.reencode();
+
+      assertEquals(9, decoded.getPriority());
+      assertEquals(9, decoded.getHeader().getPriority().byteValue());
+   }
+
+   //----- Test access message expiration ------------------------------------//
+
+   @Test
+   public void testGetExpirationFromMessageWithNoHeader() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(0, decoded.getExpiration());
+   }
+
+   @Test
+   public void testGetExpirationFromMessageWithNoTTLInHeader() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader(new Header());
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(0, decoded.getExpiration());
+   }
+
+   @Test
+   public void testGetExpirationFromMessageWithNoTTLInHeaderOrExpirationInProperties() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader(new Header());
+      protonMessage.setProperties(new Properties());
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(0, decoded.getExpiration());
+   }
+
+   @Test
+   public void testGetExpirationFromMessageUsingTTL() {
+      final long ttl = 100000;
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader(new Header());
+      protonMessage.setTtl(ttl);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertTrue(decoded.getExpiration() > System.currentTimeMillis());
+   }
+
+   @Test
+   public void testGetExpirationFromMessageUsingAbsoluteExpiration() {
+      final Date expirationTime = new Date(System.currentTimeMillis());
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      Properties properties = new Properties();
+      properties.setAbsoluteExpiryTime(expirationTime);
+      protonMessage.setProperties(properties);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(expirationTime.getTime(), decoded.getExpiration());
+   }
+
+   @Test
+   public void testGetExpirationFromMessageUsingAbsoluteExpirationNegative() {
+      final Date expirationTime = new Date(-1);
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      Properties properties = new Properties();
+      properties.setAbsoluteExpiryTime(expirationTime);
+      protonMessage.setProperties(properties);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(0, decoded.getExpiration());
+   }
+
+   @Test
+   public void testGetExpirationFromMessageAbsoluteExpirationOVerrideTTL() {
+      final Date expirationTime = new Date(System.currentTimeMillis());
+      final long ttl = 100000;
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader(new Header());
+      protonMessage.setTtl(ttl);
+      Properties properties = new Properties();
+      properties.setAbsoluteExpiryTime(expirationTime);
+      protonMessage.setProperties(properties);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(expirationTime.getTime(), decoded.getExpiration());
+   }
+
+   @Test
+   public void testSetExpiration() {
+      final Date expirationTime = new Date(System.currentTimeMillis());
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(0, decoded.getExpiration());
+      decoded.setExpiration(expirationTime.getTime());
+      assertEquals(expirationTime.getTime(), decoded.getExpiration());
+   }
+
+   @Test
+   public void testSetExpirationUpdatesProperties() {
+      final Date originalExpirationTime = new Date(System.currentTimeMillis());
+      final Date expirationTime = new Date(System.currentTimeMillis());
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setProperties(new Properties());
+      protonMessage.setExpiryTime(originalExpirationTime.getTime());
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(originalExpirationTime.getTime(), decoded.getExpiration());
+      decoded.setExpiration(expirationTime.getTime());
+      assertEquals(expirationTime.getTime(), decoded.getExpiration());
+
+      decoded.reencode();
+      assertEquals(expirationTime, decoded.getProperties().getAbsoluteExpiryTime());
+   }
+
+   @Test
+   public void testSetExpirationAddsPropertiesWhenNonePresent() {
+      final Date expirationTime = new Date(System.currentTimeMillis());
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(0, decoded.getExpiration());
+      decoded.setExpiration(expirationTime.getTime());
+      assertEquals(expirationTime.getTime(), decoded.getExpiration());
+
+      decoded.reencode();
+      assertEquals(expirationTime, decoded.getProperties().getAbsoluteExpiryTime());
+   }
+
+   @Test
+   public void testSetExpirationToClearUpdatesPropertiesWhenPresent() {
+      final Date expirationTime = new Date(System.currentTimeMillis());
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setProperties(new Properties());
+      protonMessage.setExpiryTime(expirationTime.getTime());
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(expirationTime.getTime(), decoded.getExpiration());
+      decoded.setExpiration(-1);
+      assertEquals(0, decoded.getExpiration());
+
+      decoded.reencode();
+      assertEquals(0, decoded.getExpiration());
+      assertNull(decoded.getProperties().getAbsoluteExpiryTime());
+   }
+
+   @Test
+   public void testSetExpirationToClearDoesNotAddPropertiesWhenNonePresent() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(0, decoded.getExpiration());
+      decoded.setExpiration(-1);
+      assertEquals(0, decoded.getExpiration());
+
+      decoded.reencode();
+      assertEquals(0, decoded.getExpiration());
+      assertNull(decoded.getProperties());
+   }
+
+   @Test
+   public void testSetExpirationToClearUpdateHeader() {
+      final long ttl = 100000;
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader(new Header());
+      protonMessage.setTtl(ttl);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertTrue(decoded.getExpiration() > System.currentTimeMillis());
+
+      decoded.setExpiration(-1);
+      decoded.reencode();
+
+      assertEquals(0, decoded.getExpiration());
+      assertNull(decoded.getHeader().getTtl());
+   }
+
+   //----- Test access message time stamp ------------------------------------//
+
+   @Test
+   public void testGetTimestampFromMessage() {
+      Date timestamp = new Date(System.currentTimeMillis());
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader(new Header());
+      Properties properties = new Properties();
+      properties.setCreationTime(timestamp);
+      protonMessage.setProperties(properties);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(timestamp.getTime(), decoded.getTimestamp());
+   }
+
+   @Test
+   public void testGetTimestampFromMessageWithNoCreateTimeSet() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setHeader(new Header());
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(0L, decoded.getTimestamp());
+   }
+
+   @Test
+   public void testGetTimestampFromMessageWithNoHeader() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(0L, decoded.getTimestamp());
+   }
+
+   @Test
+   public void testSetTimestampOnMessage() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      protonMessage.setProperties(new Properties());
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(0L, decoded.getTimestamp());
+
+      Date createTime = new Date(System.currentTimeMillis());
+
+      decoded.setTimestamp(createTime.getTime());
+      decoded.reencode();
+
+      assertEquals(createTime.getTime(), decoded.getTimestamp());
+      assertEquals(createTime, decoded.getProperties().getCreationTime());
+   }
+
+   @Test
+   public void testSetTimestampOnMessageWithNoPropertiesSection() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(0L, decoded.getTimestamp());
+
+      Date createTime = new Date(System.currentTimeMillis());
+
+      decoded.setTimestamp(createTime.getTime());
+      decoded.reencode();
+
+      assertNotNull(decoded.getProperties());
+      assertEquals(createTime.getTime(), decoded.getTimestamp());
+      assertEquals(createTime, decoded.getProperties().getCreationTime());
+   }
+
+   //----- Test access to message scheduled delivery time --------------------//
+
+   @Test
+   public void testGetScheduledDeliveryTimeMessageSentWithFixedTime() {
+      final long scheduledTime = System.currentTimeMillis();
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
+      annotations.getValue().put(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME, scheduledTime);
+      protonMessage.setMessageAnnotations(annotations);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(scheduledTime, decoded.getScheduledDeliveryTime().longValue());
+   }
+
+   @Test
+   public void testGetScheduledDeliveryTimeMessageSentWithFixedTimeAndDelay() {
+      final long scheduledTime = System.currentTimeMillis();
+      final long scheduledDelay = 100000;
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
+      annotations.getValue().put(AMQPMessageSupport.SCHEDULED_DELIVERY_DELAY, scheduledDelay);
+      annotations.getValue().put(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME, scheduledTime);
+      protonMessage.setMessageAnnotations(annotations);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(scheduledTime, decoded.getScheduledDeliveryTime().longValue());
+   }
+
+   @Test
+   public void testGetScheduledDeliveryTimeMessageSentWithFixedDelay() {
+      final long scheduledDelay = 100000;
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
+      annotations.getValue().put(AMQPMessageSupport.SCHEDULED_DELIVERY_DELAY, scheduledDelay);
+      protonMessage.setMessageAnnotations(annotations);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertTrue(decoded.getScheduledDeliveryTime().longValue() > System.currentTimeMillis());
+   }
+
+   @Test
+   public void testGetScheduledDeliveryTimeWhenMessageHasNoSetValue() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+      assertEquals(0, decoded.getScheduledDeliveryTime().longValue());
+   }
+
+   @Test
+   public void testSetScheduledDeliveryTimeWhenNonPresent() {
+      final long scheduledTime = System.currentTimeMillis() + 5000;
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(0, decoded.getScheduledDeliveryTime().longValue());
+      decoded.setScheduledDeliveryTime(scheduledTime);
+      assertEquals(scheduledTime, decoded.getScheduledDeliveryTime().longValue());
+
+      decoded.reencode();
+
+      assertEquals(scheduledTime, decoded.getMessageAnnotations().getValue().get(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME));
+   }
+
+   @Test
+   public void testSetScheduledDeliveryTimeMessageSentWithFixedTime() {
+      final long scheduledTime = System.currentTimeMillis();
+      final long newScheduledTime = System.currentTimeMillis() + 1000;
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
+      annotations.getValue().put(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME, scheduledTime);
+      protonMessage.setMessageAnnotations(annotations);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(scheduledTime, decoded.getScheduledDeliveryTime().longValue());
+
+      decoded.setScheduledDeliveryTime(newScheduledTime);
+      assertEquals(newScheduledTime, decoded.getScheduledDeliveryTime().longValue());
+      decoded.reencode();
+      assertEquals(newScheduledTime, decoded.getMessageAnnotations().getValue().get(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME));
+   }
+
+   @Test
+   public void testSetScheduledDeliveryTimeMessageSentWithFixedDelay() {
+      final long scheduledDelay = 100000;
+      final long newScheduledTime = System.currentTimeMillis() + 1000;
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
+      annotations.getValue().put(AMQPMessageSupport.SCHEDULED_DELIVERY_DELAY, scheduledDelay);
+      protonMessage.setMessageAnnotations(annotations);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertTrue(decoded.getScheduledDeliveryTime().longValue() > System.currentTimeMillis());
+
+      decoded.setScheduledDeliveryTime(newScheduledTime);
+      assertEquals(newScheduledTime, decoded.getScheduledDeliveryTime().longValue());
+      decoded.reencode();
+      assertEquals(newScheduledTime, decoded.getMessageAnnotations().getValue().get(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME));
+   }
+
+   @Test
+   public void testSetScheduledDeliveryTimeToNoneClearsDelayAndTimeValues() {
+      final long scheduledTime = System.currentTimeMillis();
+      final long scheduledDelay = 100000;
+
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+      MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
+      annotations.getValue().put(AMQPMessageSupport.SCHEDULED_DELIVERY_DELAY, scheduledDelay);
+      annotations.getValue().put(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME, scheduledTime);
+      protonMessage.setMessageAnnotations(annotations);
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertEquals(scheduledTime, decoded.getScheduledDeliveryTime().longValue());
+
+      decoded.setScheduledDeliveryTime((long) 0);
+      decoded.reencode();
+      assertNull(decoded.getMessageAnnotations().getValue().get(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME));
+      assertNull(decoded.getMessageAnnotations().getValue().get(AMQPMessageSupport.SCHEDULED_DELIVERY_DELAY));
+   }
+
+   //----- Tests access to Message Annotations -------------------------------//
+
+   @Test
+   public void testGetAnnotation() {
+      AMQPMessage message = new AMQPMessage(0, encodedProtonMessage, null);
+
+      Object result = message.getAnnotation(new SimpleString(TEST_MESSAGE_ANNOTATION_KEY));
+      String stringResult = message.getAnnotationString(new SimpleString(TEST_MESSAGE_ANNOTATION_KEY));
+
+      assertEquals(result, stringResult);
+   }
+
+   @Test
+   public void testRemoveAnnotation() {
+      AMQPMessage message = new AMQPMessage(0, encodedProtonMessage, null);
+
+      assertNotNull(message.getAnnotation(new SimpleString(TEST_MESSAGE_ANNOTATION_KEY)));
+      message.removeAnnotation(new SimpleString(TEST_MESSAGE_ANNOTATION_KEY));
+      assertNull(message.getAnnotation(new SimpleString(TEST_MESSAGE_ANNOTATION_KEY)));
+
+      message.reencode();
+
+      assertTrue(message.getMessageAnnotations().getValue().isEmpty());
+   }
+
+   @Test
+   public void testSetAnnotation() {
+      AMQPMessage message = new AMQPMessage(0, encodedProtonMessage, null);
+
+      final SimpleString newAnnotation = new SimpleString("testSetAnnotation");
+      final String newValue = "newValue";
+
+      message.setAnnotation(newAnnotation, newValue);
+      assertEquals(newValue, message.getAnnotation(newAnnotation));
+
+      message.reencode();
+
+      assertEquals(newValue, message.getMessageAnnotations().getValue().get(Symbol.valueOf(newAnnotation.toString())));
+   }
+
+   //----- Tests accessing Proton Sections from encoded data -----------------//
+
+   @Test
+   public void testGetProtonMessage() {
+      MessageImpl protonMessage = createProtonMessage();
+      AMQPMessage message = new AMQPMessage(0, encodeMessage(protonMessage), null, null);
+
+      assertProtonMessageEquals(protonMessage, message.getProtonMessage());
+
+      message.setAnnotation(new SimpleString("testGetProtonMessage"), "1");
+      message.messageChanged();
+
+      assertProtonMessageNotEquals(protonMessage, message.getProtonMessage());
+   }
+
+   @Test
+   public void testGetHeader() {
+      MessageImpl protonMessage = createProtonMessage();
+      AMQPMessage message = new AMQPMessage(0, encodeMessage(protonMessage), null, null);
+
+      Header decoded = message.getHeader();
+      assertNotSame(decoded, protonMessage.getHeader());
+      assertHeaderEquals(protonMessage.getHeader(), decoded);
+
+      // Update the values
+      decoded.setDeliveryCount(UnsignedInteger.ZERO);
+      decoded.setTtl(UnsignedInteger.valueOf(255));
+      decoded.setFirstAcquirer(true);
+
+      // Check that the message is unaffected.
+      assertHeaderNotEquals(protonMessage.getHeader(), decoded);
+   }
+
+   @Test
+   public void testGetProperties() {
+      MessageImpl protonMessage = createProtonMessage();
+      AMQPMessage message = new AMQPMessage(0, encodeMessage(protonMessage), null, null);
+
+      Properties decoded = message.getProperties();
+      assertNotSame(decoded, protonMessage.getProperties());
+      assertPropertiesEquals(protonMessage.getProperties(), decoded);
+
+      // Update the values
+      decoded.setAbsoluteExpiryTime(new Date(System.currentTimeMillis()));
+      decoded.setGroupSequence(UnsignedInteger.valueOf(255));
+      decoded.setSubject(UUID.randomUUID().toString());
+
+      // Check that the message is unaffected.
+      assertPropertiesNotEquals(protonMessage.getProperties(), decoded);
+   }
+
+   @Test
+   public void testGetDeliveryAnnotations() {
+      MessageImpl protonMessage = createProtonMessage();
+      DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(new HashMap<>());
+      deliveryAnnotations.getValue().put(Symbol.valueOf(UUID.randomUUID().toString()), "test-1");
+      protonMessage.setDeliveryAnnotations(deliveryAnnotations);
+
+      AMQPMessage message = new AMQPMessage(0, encodeMessage(protonMessage), null, null);
+
+      DeliveryAnnotations decoded = message.getDeliveryAnnotations();
+      assertNotSame(decoded, protonMessage.getDeliveryAnnotations());
+      assertDeliveryAnnotationsEquals(protonMessage.getDeliveryAnnotations(), decoded);
+
+      // Update the values
+      decoded.getValue().put(Symbol.valueOf(UUID.randomUUID().toString()), "test-2");
+
+      // Check that the message is unaffected.
+      assertDeliveryAnnotationsNotEquals(protonMessage.getDeliveryAnnotations(), decoded);
+   }
+
+   @Test
+   public void testGetMessageAnnotations() {
+      MessageImpl protonMessage = createProtonMessage();
+      AMQPMessage message = new AMQPMessage(0, encodeMessage(protonMessage), null, null);
+
+      MessageAnnotations decoded = message.getMessageAnnotations();
+      assertNotSame(decoded, protonMessage.getMessageAnnotations());
+      assertMessageAnnotationsEquals(protonMessage.getMessageAnnotations(), decoded);
+
+      // Update the values
+      decoded.getValue().put(Symbol.valueOf(UUID.randomUUID().toString()), "test");
+
+      // Check that the message is unaffected.
+      assertMessageAnnotationsNotEquals(protonMessage.getMessageAnnotations(), decoded);
+   }
+
+   @Test
+   public void testGetApplicationProperties() {
+      MessageImpl protonMessage = createProtonMessage();
+      AMQPMessage message = new AMQPMessage(0, encodeMessage(protonMessage), null, null);
+
+      ApplicationProperties decoded = message.getApplicationProperties();
+      assertNotSame(decoded, protonMessage.getApplicationProperties());
+      assertApplicationPropertiesEquals(protonMessage.getApplicationProperties(), decoded);
+
+      // Update the values
+      decoded.getValue().put(UUID.randomUUID().toString(), "test");
+
+      // Check that the message is unaffected.
+      assertApplicationPropertiesNotEquals(protonMessage.getApplicationProperties(), decoded);
+   }
+
+   @Test
+   public void testGetBody() {
+      MessageImpl protonMessage = createProtonMessage();
+      AMQPMessage message = new AMQPMessage(0, encodeMessage(protonMessage), null, null);
+
+      Object body = message.getBody();
+      assertTrue(body instanceof AmqpValue);
+      AmqpValue amqpValueBody = (AmqpValue) body;
+
+      assertNotNull(amqpValueBody.getValue());
+      assertNotSame(((AmqpValue)protonMessage.getBody()).getValue(), amqpValueBody.getValue());
+      assertEquals(((AmqpValue)protonMessage.getBody()).getValue(), amqpValueBody.getValue());
+   }
+
+   @SuppressWarnings("unchecked")
+   @Test
+   public void testGetFooter() {
+      MessageImpl protonMessage = createProtonMessage();
+      Footer footer = new Footer(new HashMap<>());
+      footer.getValue().put(Symbol.valueOf(UUID.randomUUID().toString()), "test-1");
+      protonMessage.setFooter(footer);
+
+      AMQPMessage message = new AMQPMessage(0, encodeMessage(protonMessage), null, null);
+
+      Footer decoded = message.getFooter();
+      assertNotSame(decoded, protonMessage.getFooter());
+      assertFootersEquals(protonMessage.getFooter(), decoded);
+
+      // Update the values
+      decoded.getValue().put(Symbol.valueOf(UUID.randomUUID().toString()), "test-2");
+
+      // Check that the message is unaffected.
+      assertFootersNotEquals(protonMessage.getFooter(), decoded);
+   }
+
+   //----- Test re-encode of updated message sections ------------------------//
+
+   @Test
+   public void testApplicationPropertiesReencodeAfterUpdate() {
+      MessageImpl protonMessage = createProtonMessage();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertProtonMessageEquals(protonMessage, decoded.getProtonMessage());
+
+      decoded.putStringProperty("key-2", "value-2");
+      decoded.reencode();
+
+      assertProtonMessageNotEquals(protonMessage, decoded.getProtonMessage());
+
+      assertEquals(decoded.getStringProperty(TEST_APPLICATION_PROPERTY_KEY), TEST_APPLICATION_PROPERTY_VALUE);
+      assertEquals(decoded.getStringProperty("key-2"), "value-2");
+   }
+
+   @Test
+   public void testMessageAnnotationsReencodeAfterUpdate() {
+      final SimpleString TEST_ANNOTATION = new SimpleString("testMessageAnnotationsReencodeAfterUpdate");
+
+      MessageImpl protonMessage = createProtonMessage();
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertProtonMessageEquals(protonMessage, decoded.getProtonMessage());
+
+      decoded.setAnnotation(TEST_ANNOTATION, "value-2");
+      decoded.reencode();
+
+      assertProtonMessageNotEquals(protonMessage, decoded.getProtonMessage());
+
+      assertEquals(decoded.getAnnotation(TEST_ANNOTATION), "value-2");
+   }
+
+   //----- Test handling of message extra properties -------------------------//
+
+   @Test
+   public void testExtraByteProperty() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+
+      byte[] value = RandomUtil.randomBytes();
+      SimpleString name = SimpleString.toSimpleString("myProperty");
+
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+
+      assertNull(decoded.getExtraProperties());
+      assertNull(decoded.getExtraBytesProperty(name));
+      assertNull(decoded.removeExtraBytesProperty(name));
+
+      decoded.putExtraBytesProperty(name, value);
+      assertFalse(decoded.getExtraProperties().isEmpty());
+
+      assertTrue(Arrays.equals(value, decoded.getExtraBytesProperty(name)));
+      assertTrue(Arrays.equals(value, decoded.removeExtraBytesProperty(name)));
+   }
+
+   @Test
+   public void testExtraProperty() {
+      MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+
+      byte[] original = RandomUtil.randomBytes();
+      SimpleString name = SimpleString.toSimpleString("myProperty");
+      AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
+      decoded.setAddress("someAddress");
+      decoded.setMessageID(33);
+      decoded.putExtraBytesProperty(name, original);
+
+      ICoreMessage coreMessage = decoded.toCore();
+      Assert.assertEquals(original, coreMessage.getBytesProperty(name));
+
+      ActiveMQBuffer buffer = ActiveMQBuffers.pooledBuffer(10 * 1024);
+      try {
+         decoded.getPersister().encode(buffer, decoded);
+         Assert.assertEquals(AMQPMessagePersisterV2.getInstance().getID(), buffer.readByte()); // the journal reader will read 1 byte to find the persister
+         AMQPMessage readMessage = (AMQPMessage)decoded.getPersister().decode(buffer, null);
+         Assert.assertEquals(33, readMessage.getMessageID());
+         Assert.assertEquals("someAddress", readMessage.getAddress());
+         Assert.assertArrayEquals(original, readMessage.getExtraBytesProperty(name));
+      } finally {
+         buffer.release();
+      }
+
+      {
+         ICoreMessage embeddedMessage = EmbedMessageUtil.embedAsCoreMessage(decoded);
+         AMQPMessage readMessage = (AMQPMessage) EmbedMessageUtil.extractEmbedded(embeddedMessage);
+         Assert.assertEquals(33, readMessage.getMessageID());
+         Assert.assertEquals("someAddress", readMessage.getAddress());
+         Assert.assertArrayEquals(original, readMessage.getExtraBytesProperty(name));
+      }
+   }
+
+   //----- Test that message decode ignores unused sections ------------------//
+
+   private static final UnsignedLong AMQPVALUE_DESCRIPTOR = UnsignedLong.valueOf(0x0000000000000077L);
+   private static final UnsignedLong APPLICATION_PROPERTIES_DESCRIPTOR = UnsignedLong.valueOf(0x0000000000000074L);
+   private static final UnsignedLong DELIVERY_ANNOTATIONS_DESCRIPTOR = UnsignedLong.valueOf(0x0000000000000071L);
+
+   @Test
+   public void testPartialDecodeIgnoresDeliveryAnnotationsByDefault() {
+      Header header = new Header();
+      header.setDurable(true);
+      header.setPriority(UnsignedByte.valueOf((byte) 6));
+
+      ByteBuf encodedBytes = Unpooled.buffer(1024);
+      NettyWritable writable = new NettyWritable(encodedBytes);
+
+      EncoderImpl encoder = TLSEncode.getEncoder();
+      encoder.setByteBuffer(writable);
+      encoder.writeObject(header);
+
+      // Signal body of AmqpValue but write corrupt underlying type info
+      encodedBytes.writeByte(EncodingCodes.DESCRIBED_TYPE_INDICATOR);
+      encodedBytes.writeByte(EncodingCodes.SMALLULONG);
+      encodedBytes.writeByte(DELIVERY_ANNOTATIONS_DESCRIPTOR.byteValue());
+      encodedBytes.writeByte(EncodingCodes.MAP8);
+      encodedBytes.writeByte(2);  // Size
+      encodedBytes.writeByte(2);  // Elements
+      // Use bad encoding code on underlying type of map key which will fail the decode if run
+      encodedBytes.writeByte(255);
+
+      ReadableBuffer readable = new NettyReadable(encodedBytes);
+
+      AMQPMessage message = null;
+      try {
+         message = new AMQPMessage(0, readable, null, null);
+      } catch (Exception decodeError) {
+         fail("Should not have encountered an exception on partial decode: " + decodeError.getMessage());
+      }
+
+      try {
+         // This should perform the lazy decode of the DeliveryAnnotations portion of the message
+         message.getDeliveryAnnotations();
+         fail("Should have thrown an error when attempting to decode the ApplicationProperties which are malformed.");
+      } catch (Exception ex) {
+         // Expected decode to fail when building full message.
+      }
+   }
+
+   @Test
+   public void testPartialDecodeIgnoresApplicationPropertiesByDefault() {
+      Header header = new Header();
+      header.setDurable(true);
+      header.setPriority(UnsignedByte.valueOf((byte) 6));
+
+      ByteBuf encodedBytes = Unpooled.buffer(1024);
+      NettyWritable writable = new NettyWritable(encodedBytes);
+
+      EncoderImpl encoder = TLSEncode.getEncoder();
+      encoder.setByteBuffer(writable);
+      encoder.writeObject(header);
+
+      // Signal body of AmqpValue but write corrupt underlying type info
+      encodedBytes.writeByte(EncodingCodes.DESCRIBED_TYPE_INDICATOR);
+      encodedBytes.writeByte(EncodingCodes.SMALLULONG);
+      encodedBytes.writeByte(APPLICATION_PROPERTIES_DESCRIPTOR.byteValue());
+      encodedBytes.writeByte(EncodingCodes.MAP8);
+      encodedBytes.writeByte(2);  // Size
+      encodedBytes.writeByte(2);  // Elements
+      // Use bad encoding code on underlying type of map key which will fail the decode if run
+      encodedBytes.writeByte(255);
+
+      ReadableBuffer readable = new NettyReadable(encodedBytes);
+
+      AMQPMessage message = null;
+      try {
+         message = new AMQPMessage(0, readable, null, null);
+      } catch (Exception decodeError) {
+         fail("Should not have encountered an exception on partial decode: " + decodeError.getMessage());
+      }
+
+      assertTrue(message.isDurable());
+
+      try {
+         // This should perform the lazy decode of the ApplicationProperties portion of the message
+         message.getStringProperty("test");
+         fail("Should have thrown an error when attempting to decode the ApplicationProperties which are malformed.");
+      } catch (Exception ex) {
+         // Expected decode to fail when building full message.
+      }
+   }
+
+   @Test
+   public void testPartialDecodeIgnoresBodyByDefault() {
+      Header header = new Header();
+      header.setDurable(true);
+      header.setPriority(UnsignedByte.valueOf((byte) 6));
+
+      ByteBuf encodedBytes = Unpooled.buffer(1024);
+      NettyWritable writable = new NettyWritable(encodedBytes);
+
+      EncoderImpl encoder = TLSEncode.getEncoder();
+      encoder.setByteBuffer(writable);
+      encoder.writeObject(header);
+
+      // Signal body of AmqpValue but write corrupt underlying type info
+      encodedBytes.writeByte(EncodingCodes.DESCRIBED_TYPE_INDICATOR);
+      encodedBytes.writeByte(EncodingCodes.SMALLULONG);
+      encodedBytes.writeByte(AMQPVALUE_DESCRIPTOR.byteValue());
+      // Use bad encoding code on underlying type
+      encodedBytes.writeByte(255);
+
+      ReadableBuffer readable = new NettyReadable(encodedBytes);
+
+      AMQPMessage message = null;
+      try {
+         message = new AMQPMessage(0, readable, null, null);
+      } catch (Exception decodeError) {
+         fail("Should not have encountered an exception on partial decode: " + decodeError.getMessage());
+      }
+
+      assertTrue(message.isDurable());
+
+      try {
+         // This will decode the body section if present in order to present it as a Proton Message object
+         message.getBody();
+         fail("Should have thrown an error when attempting to decode the body which is malformed.");
+      } catch (Exception ex) {
+         // Expected decode to fail when building full message.
+      }
+   }
+
+   //----- Tests for message copy correctness --------------------------------//
+
+   @Test
+   public void testCopyMessage() {
+      AMQPMessage message = new AMQPMessage(0, encodedProtonMessage, null, null);
+      message.setMessageID(127);
+      AMQPMessage copy = (AMQPMessage) message.copy();
+
+      assertEquals(message.getMessageID(), copy.getMessageID());
+      assertProtonMessageEquals(message.getProtonMessage(), copy.getProtonMessage());
+   }
+
+   @Test
+   public void testCopyMessageWithNewArtemisMessageID() {
+      AMQPMessage message = new AMQPMessage(0, encodedProtonMessage, null, null);
+      message.setMessageID(127);
+      AMQPMessage copy = (AMQPMessage) message.copy(255);
+
+      assertNotEquals(message.getMessageID(), copy.getMessageID());
+      assertProtonMessageEquals(message.getProtonMessage(), copy.getProtonMessage());
+   }
+
+   @Test
+   public void testCopyMessageDoesNotRemovesMessageAnnotations() {
+      MessageImpl protonMessage = createProtonMessage();
+      DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(new HashMap<>());
+      deliveryAnnotations.getValue().put(Symbol.valueOf("testCopyMessageRemovesMessageAnnotations"), "1");
+      protonMessage.setDeliveryAnnotations(deliveryAnnotations);
+
+      AMQPMessage message = new AMQPMessage(0, encodeMessage(protonMessage), null, null);
+      message.setMessageID(127);
+      AMQPMessage copy = (AMQPMessage) message.copy();
+
+      assertEquals(message.getMessageID(), copy.getMessageID());
+      assertProtonMessageEquals(message.getProtonMessage(), copy.getProtonMessage());
+      assertNotNull(copy.getDeliveryAnnotations());
+   }
+
+   @Test
+   public void testDecodeCopyUpdateReencodeAndThenDecodeAgain() {
+      AMQPMessage message = new AMQPMessage(0, encodedProtonMessage, null, null);
+
+      // Sanity checks
+      assertTrue(message.isDurable());
+      assertEquals(TEST_STRING_BODY, ((AmqpValue) message.getBody()).getValue());
+
+      // Copy the message
+      message = (AMQPMessage) message.copy();
+
+      // Sanity checks
+      assertTrue(message.isDurable());
+      assertEquals(TEST_STRING_BODY, ((AmqpValue) message.getBody()).getValue());
+
+      // Update the message
+      message.setAnnotation(new SimpleString("x-opt-extra-1"), "test-1");
+      message.setAnnotation(new SimpleString("x-opt-extra-2"), "test-2");
+      message.setAnnotation(new SimpleString("x-opt-extra-3"), "test-3");
+
+      // Reencode and then decode the message again
+      message.reencode();
+
+      // Sanity checks
+      assertTrue(message.isDurable());
+      assertEquals(TEST_STRING_BODY, ((AmqpValue) message.getBody()).getValue());
+   }
+
+   //----- Test sendBuffer method --------------------------------------------//
+
+   @Test
+   public void testSendBuffer() {
+      ByteBuf buffer = Unpooled.buffer(255);
+      AMQPMessage message = new AMQPMessage(0, encodedProtonMessage, null, null);
+
+      message.sendBuffer(buffer, 1);
+
+      assertNotNull(buffer);
+
+      AMQPMessage copy = new AMQPMessage(0, new NettyReadable(buffer), null, null);
+
+      assertProtonMessageEquals(message.getProtonMessage(), copy.getProtonMessage());
+   }
+
+   //----- Test getSendBuffer variations -------------------------------------//
+
+   @Test
+   public void testGetSendBuffer() {
+      AMQPMessage message = new AMQPMessage(0, encodedProtonMessage, null, null);
+
+      ReadableBuffer buffer = message.getSendBuffer(1);
+      assertNotNull(buffer);
+      assertTrue(buffer.hasArray());
+
+      assertTrue(Arrays.equals(encodedProtonMessage, buffer.array()));
+
+      AMQPMessage copy = new AMQPMessage(0, buffer, null, null);
+
+      assertProtonMessageEquals(message.getProtonMessage(), copy.getProtonMessage());
+   }
+
+   @Test
+   public void testGetSendBufferAddsDeliveryCountOnlyToSendMessage() {
+      AMQPMessage message = new AMQPMessage(0, encodedProtonMessage, null, null);
+
+      ReadableBuffer buffer = message.getSendBuffer(7);
+      assertNotNull(buffer);
+      message.reencode(); // Ensures Header is current if accidentally updated
+
+      AMQPMessage copy = new AMQPMessage(0, buffer, null, null);
+
+      MessageImpl originalsProtonMessage = message.getProtonMessage();
+      MessageImpl copyProtonMessage = copy.getProtonMessage();
+      assertProtonMessageNotEquals(originalsProtonMessage, copyProtonMessage);
+
+      assertNull(originalsProtonMessage.getHeader().getDeliveryCount());
+      assertEquals(6, copyProtonMessage.getHeader().getDeliveryCount().intValue());
+   }
+
+   @Test
+   public void testGetSendBufferAddsDeliveryCountOnlyToSendMessageOriginalHadNoHeader() {
+      MessageImpl protonMessage = (MessageImpl) Proton.message();
+      AMQPMessage message = new AMQPMessage(0, encodeMessage(protonMessage), null, null);
+
+      ReadableBuffer buffer = message.getSendBuffer(7);
+      assertNotNull(buffer);
+      message.reencode(); // Ensures Header is current if accidentally updated
+
+      AMQPMessage copy = new AMQPMessage(0, buffer, null, null);
+
+      MessageImpl originalsProtonMessage = message.getProtonMessage();
+      MessageImpl copyProtonMessage = copy.getProtonMessage();
+      assertProtonMessageNotEquals(originalsProtonMessage, copyProtonMessage);
+
+      assertNull(originalsProtonMessage.getHeader());
+      assertEquals(6, copyProtonMessage.getHeader().getDeliveryCount().intValue());
+   }
+
+   @Test
+   public void testGetSendBufferRemoveDeliveryAnnotations() {
+      MessageImpl protonMessage = createProtonMessage();
+      DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(new HashMap<>());
+      deliveryAnnotations.getValue().put(Symbol.valueOf("testGetSendBufferRemoveDeliveryAnnotations"), "X");
+      protonMessage.setDeliveryAnnotations(deliveryAnnotations);
+      AMQPMessage message = new AMQPMessage(0, encodeMessage(protonMessage), null, null);
+
+      ReadableBuffer buffer = message.getSendBuffer(1);
+      assertNotNull(buffer);
+
+      AMQPMessage copy = new AMQPMessage(0, buffer, null, null);
+
+      MessageImpl copyProtonMessage = copy.getProtonMessage();
+      assertProtonMessageNotEquals(message.getProtonMessage(), copyProtonMessage);
+      assertNull(copyProtonMessage.getDeliveryAnnotations());
+   }
+
+   @Test
+   public void testGetSendBufferAddsDeliveryCountOnlyToSendMessageAndTrimsDeliveryAnnotations() {
+      MessageImpl protonMessage = createProtonMessage();
+      DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(new HashMap<>());
+      deliveryAnnotations.getValue().put(Symbol.valueOf("testGetSendBufferRemoveDeliveryAnnotations"), "X");
+      protonMessage.setDeliveryAnnotations(deliveryAnnotations);
+      AMQPMessage message = new AMQPMessage(0, encodeMessage(protonMessage), null, null);
+
+      ReadableBuffer buffer = message.getSendBuffer(7);
+      assertNotNull(buffer);
+      message.reencode(); // Ensures Header is current if accidentally updated
+
+      AMQPMessage copy = new AMQPMessage(0, buffer, null, null);
+
+      MessageImpl originalsProtonMessage = message.getProtonMessage();
+      MessageImpl copyProtonMessage = copy.getProtonMessage();
+      assertProtonMessageNotEquals(originalsProtonMessage, copyProtonMessage);
+
+      assertNull(originalsProtonMessage.getHeader().getDeliveryCount());
+      assertEquals(6, copyProtonMessage.getHeader().getDeliveryCount().intValue());
+      assertNull(copyProtonMessage.getDeliveryAnnotations());
+   }
+
+   //----- Test reencode method ----------------------------------------------//
+
+   @Test
+   public void testReencodeOnMessageWithNoPayoad() {
+      doTestMessageReencodeProducesEqualMessage(false, false, false, false, false, false, false);
+   }
+
+   @Test
+   public void testReencodeOnMessageWithFullPayoad() {
+      doTestMessageReencodeProducesEqualMessage(true, true, true, true, true, true, true);
+   }
+
+   @Test
+   public void testReencodeOnMessageWithHeadersOnly() {
+      doTestMessageReencodeProducesEqualMessage(true, false, false, false, false, false, false);
+   }
+
+   @Test
+   public void testReencodeOnMessageWithDeliveryAnnotationsOnly() {
+      doTestMessageReencodeProducesEqualMessage(false, true, false, false, false, false, false);
+   }
+
+   @Test
+   public void testReencodeOnMessageWithMessageAnnotationsOnly() {
+      doTestMessageReencodeProducesEqualMessage(false, false, true, false, false, false, false);
+   }
+
+   @Test
+   public void testReencodeOnMessageWithPropertiesOnly() {
+      doTestMessageReencodeProducesEqualMessage(false, false, false, true, false, false, false);
+   }
+
+   @Test
+   public void testReencodeOnMessageWithApplicationPropertiesOnly() {
+      doTestMessageReencodeProducesEqualMessage(false, false, false, false, true, false, false);
+   }
+
+   @Test
+   public void testReencodeOnMessageWithBodyOnly() {
+      doTestMessageReencodeProducesEqualMessage(false, false, false, false, false, true, false);
+   }
+
+   @Test
+   public void testReencodeOnMessageWithFooterOnly() {
+      doTestMessageReencodeProducesEqualMessage(false, false, false, false, false, false, true);
+   }
+
+   @Test
+   public void testReencodeOnMessageWithApplicationPropertiesAndBody() {
+      doTestMessageReencodeProducesEqualMessage(false, false, false, false, true, true, false);
+   }
+
+   @Test
+   public void testReencodeOnMessageWithApplicationPropertiesAndBodyAndFooter() {
+      doTestMessageReencodeProducesEqualMessage(false, false, false, false, true, true, true);
+   }
+
+   @Test
+   public void testReencodeOnMessageWithPropertiesApplicationPropertiesAndBodyAndFooter() {
+      doTestMessageReencodeProducesEqualMessage(false, false, false, true, true, true, true);
+   }
+
+   @Test
+   public void testReencodeOnMessageWithMessageAnnotationsPropertiesApplicationPropertiesAndBodyAndFooter() {
+      doTestMessageReencodeProducesEqualMessage(false, false, true, true, true, true, true);
+   }
+
+   @Test
+   public void testReencodeOnMessageWithDeliveryAnnotationsMessageAnnotationsPropertiesApplicationPropertiesBodyFooter() {
+      doTestMessageReencodeProducesEqualMessage(false, true, true, true, true, true, true);
+   }
+
+   @SuppressWarnings("unchecked")
+   private void doTestMessageReencodeProducesEqualMessage(
+      boolean header, boolean deliveryAnnotations, boolean messageAnnotations, boolean properties, boolean applicationProperties, boolean body, boolean footer) {
+
+      MessageImpl protonMessage = (MessageImpl) Proton.message();
+
+      if (header) {
+         Header headers = new Header();
+         headers.setDurable(true);
+         headers.setPriority(UnsignedByte.valueOf((byte) 9));
+         protonMessage.setHeader(headers);
+      }
+
+      if (properties) {
+         Properties props = new Properties();
+         props.setCreationTime(new Date(System.currentTimeMillis()));
+         props.setTo(TEST_TO_ADDRESS);
+         props.setMessageId(UUID.randomUUID());
+         protonMessage.setProperties(props);
+      }
+
+      if (deliveryAnnotations) {
+         DeliveryAnnotations annotations = new DeliveryAnnotations(new HashMap<>());
+         annotations.getValue().put(Symbol.valueOf(TEST_MESSAGE_ANNOTATION_KEY + "_DA"), TEST_MESSAGE_ANNOTATION_VALUE);
+         protonMessage.setDeliveryAnnotations(annotations);
+      }
+
+      if (messageAnnotations) {
+         MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
+         annotations.getValue().put(Symbol.valueOf(TEST_MESSAGE_ANNOTATION_KEY), TEST_MESSAGE_ANNOTATION_VALUE);
+         protonMessage.setMessageAnnotations(annotations);
+      }
+
+      if (applicationProperties) {
+         ApplicationProperties appProps = new ApplicationProperties(new HashMap<>());
+         appProps.getValue().put(TEST_APPLICATION_PROPERTY_KEY, TEST_APPLICATION_PROPERTY_VALUE);
+         protonMessage.setApplicationProperties(appProps);
+      }
+
+      if (body) {
+         AmqpValue text = new AmqpValue(TEST_STRING_BODY);
+         protonMessage.setBody(text);
+      }
+
+      if (footer) {
+         Footer foot = new Footer(new HashMap<>());
+         foot.getValue().put(Symbol.valueOf(TEST_MESSAGE_ANNOTATION_KEY + "_FOOT"), TEST_MESSAGE_ANNOTATION_VALUE);
+         protonMessage.setFooter(foot);
+      }
+
+      AMQPMessage message = new AMQPMessage(0, encodeMessage(protonMessage), null, null);
+
+      message.reencode();
+
+      if (deliveryAnnotations) {
+         assertProtonMessageNotEquals(protonMessage, message.getProtonMessage());
+      } else {
+         assertProtonMessageEquals(protonMessage, message.getProtonMessage());
+      }
+   }
+
+   //----- Test Support ------------------------------------------------------//
+
+   private MessageImpl createProtonMessage() {
+      MessageImpl message = (MessageImpl) Proton.message();
+
+      Header header = new Header();
+      header.setDurable(true);
+      header.setPriority(UnsignedByte.valueOf((byte) 9));
+
+      Properties properties = new Properties();
+      properties.setCreationTime(new Date(System.currentTimeMillis()));
+      properties.setTo(TEST_TO_ADDRESS);
+      properties.setMessageId(UUID.randomUUID());
+
+      MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
+      annotations.getValue().put(Symbol.valueOf(TEST_MESSAGE_ANNOTATION_KEY), TEST_MESSAGE_ANNOTATION_VALUE);
+
+      ApplicationProperties applicationProperties = new ApplicationProperties(new HashMap<>());
+      applicationProperties.getValue().put(TEST_APPLICATION_PROPERTY_KEY, TEST_APPLICATION_PROPERTY_VALUE);
+
+      AmqpValue body = new AmqpValue(TEST_STRING_BODY);
+
+      message.setHeader(header);
+      message.setMessageAnnotations(annotations);
+      message.setProperties(properties);
+      message.setApplicationProperties(applicationProperties);
+      message.setBody(body);
+
+      return message;
+   }
+
+   private void assertProtonMessageEquals(MessageImpl left, MessageImpl right) {
+      if (!isEquals(left, right)) {
+         fail("MessageImpl values should be equal: left{" + left + "} right{" + right + "}");
+      }
+   }
+
+   private void assertProtonMessageNotEquals(MessageImpl left, MessageImpl right) {
+      if (isEquals(left, right)) {
+         fail("MessageImpl values should be equal: left{" + left + "} right{" + right + "}");
+      }
+   }
+
+   private boolean isEquals(MessageImpl left, MessageImpl right) {
+      if (left == null && right == null) {
+         return true;
+      }
+      if (!isNullnessEquals(left, right)) {
+         return false;
+      }
+
+      try {
+         assertHeaderEquals(left.getHeader(), right.getHeader());
+         assertDeliveryAnnotationsEquals(left.getDeliveryAnnotations(), right.getDeliveryAnnotations());
+         assertMessageAnnotationsEquals(left.getMessageAnnotations(), right.getMessageAnnotations());
+         assertPropertiesEquals(left.getProperties(), right.getProperties());
+         assertApplicationPropertiesEquals(left.getApplicationProperties(), right.getApplicationProperties());
+         assertTrue(isEquals(left.getBody(), right.getBody()));
+         assertFootersEquals(left.getFooter(), right.getFooter());
+      } catch (Throwable e) {
+         return false;
+      }
+
+      return true;
+   }
+
+   private void assertHeaderEquals(Header left, Header right) {
+      if (!isEquals(left, right)) {
+         fail("Header values should be equal: left{" + left + "} right{" + right + "}");
+      }
+   }
+
+   private void assertHeaderNotEquals(Header left, Header right) {
+      if (isEquals(left, right)) {
+         fail("Header values should not be equal: left{" + left + "} right{" + right + "}");
+      }
+   }
+
+   private boolean isEquals(Header left, Header right) {
+      if (left == null && right == null) {
+         return true;
+      }
+      if (!isNullnessEquals(left, right)) {
+         return false;
+      }
+
+      try {
+         assertEquals(left.getDurable(), right.getDurable());
+         assertEquals(left.getDeliveryCount(), right.getDeliveryCount());
+         assertEquals(left.getFirstAcquirer(), right.getFirstAcquirer());
+         assertEquals(left.getPriority(), right.getPriority());
+         assertEquals(left.getTtl(), right.getTtl());
+      } catch (Throwable e) {
+         return false;
+      }
+
+      return true;
+   }
+
+   private void assertPropertiesEquals(Properties left, Properties right) {
+      if (!isEquals(left, right)) {
+         fail("Properties values should be equal: left{" + left + "} right{" + right + "}");
+      }
+   }
+
+   private void assertPropertiesNotEquals(Properties left, Properties right) {
+      if (isEquals(left, right)) {
+         fail("Properties values should not be equal: left{" + left + "} right{" + right + "}");
+      }
+   }
+
+   private boolean isEquals(Properties left, Properties right) {
+      if (left == null && right == null) {
+         return true;
+      }
+      if (!isNullnessEquals(left, right)) {
+         return false;
+      }
+
+      try {
+         assertEquals(left.getAbsoluteExpiryTime(), right.getAbsoluteExpiryTime());
+         assertEquals(left.getContentEncoding(), right.getAbsoluteExpiryTime());
+         assertEquals(left.getContentType(), right.getContentType());
+         assertEquals(left.getCorrelationId(), right.getCorrelationId());
+         assertEquals(left.getCreationTime(), right.getCreationTime());
+         assertEquals(left.getGroupId(), right.getGroupId());
+         assertEquals(left.getGroupSequence(), right.getGroupSequence());
+         assertEquals(left.getMessageId(), right.getMessageId());
+         assertEquals(left.getReplyTo(), right.getReplyTo());
+         assertEquals(left.getReplyToGroupId(), right.getReplyToGroupId());
+         assertEquals(left.getSubject(), right.getSubject());
+         assertEquals(left.getUserId(), right.getUserId());
+         assertEquals(left.getTo(), right.getTo());
+      } catch (Throwable e) {
+         return false;
+      }
+
+      return true;
+   }
+
+   private void assertMessageAnnotationsEquals(MessageAnnotations left, MessageAnnotations right) {
+      if (!isEquals(left, right)) {
+         fail("MessageAnnotations values should be equal: left{" + left + "} right{" + right + "}");
+      }
+   }
+
+   private void assertMessageAnnotationsNotEquals(MessageAnnotations left, MessageAnnotations right) {
+      if (isEquals(left, right)) {
+         fail("MessageAnnotations values should not be equal: left{" + left + "} right{" + right + "}");
+      }
+   }
+
+   private boolean isEquals(MessageAnnotations left, MessageAnnotations right) {
+      if (left == null && right == null) {
+         return true;
+      }
+      if (!isNullnessEquals(left, right)) {
+         return false;
+      }
+
+      return isEquals(left.getValue(), right.getValue());
+   }
+
+   private void assertDeliveryAnnotationsEquals(DeliveryAnnotations left, DeliveryAnnotations right) {
+      if (!isEquals(left, right)) {
+         fail("DeliveryAnnotations values should be equal: left{" + left + "} right{" + right + "}");
+      }
+   }
+
+   private void assertDeliveryAnnotationsNotEquals(DeliveryAnnotations left, DeliveryAnnotations right) {
+      if (isEquals(left, right)) {
+         fail("DeliveryAnnotations values should not be equal: left{" + left + "} right{" + right + "}");
+      }
+   }
+
+   private boolean isEquals(DeliveryAnnotations left, DeliveryAnnotations right) {
+      if (left == null && right == null) {
+         return true;
+      }
+      if (!isNullnessEquals(left, right)) {
+         return false;
+      }
+
+      return isEquals(left.getValue(), right.getValue());
+   }
+
+   private void assertApplicationPropertiesEquals(ApplicationProperties left, ApplicationProperties right) {
+      if (!isEquals(left, right)) {
+         fail("ApplicationProperties values should be equal: left{" + left + "} right{" + right + "}");
+      }
+   }
+
+   private void assertApplicationPropertiesNotEquals(ApplicationProperties left, ApplicationProperties right) {
+      if (isEquals(left, right)) {
+         fail("ApplicationProperties values should not be equal: left{" + left + "} right{" + right + "}");
+      }
+   }
+
+   private boolean isEquals(ApplicationProperties left, ApplicationProperties right) {
+      if (left == null && right == null) {
+         return true;
+      }
+      if (!isNullnessEquals(left, right)) {
+         return false;
+      }
+
+      return isEquals(left.getValue(), right.getValue());
+   }
+
+   private void assertFootersEquals(Footer left, Footer right) {
+      if (!isEquals(left, right)) {
+         fail("Footer values should be equal: left{" + left + "} right{" + right + "}");
+      }
+   }
+
+   private void assertFootersNotEquals(Footer left, Footer right) {
+      if (isEquals(left, right)) {
+         fail("Footer values should not be equal: left{" + left + "} right{" + right + "}");
+      }
+   }
+
+   private boolean isEquals(Footer left, Footer right) {
+      if (left == null && right == null) {
+         return true;
+      }
+      if (!isNullnessEquals(left, right)) {
+         return false;
+      }
+
+      return isEquals(left.getValue(), right.getValue());
+   }
+
+   private boolean isEquals(Map<?, ?> left, Map<?, ?> right) {
+      if (left == null && right == null) {
+         return true;
+      }
+
+      if (!isNullnessEquals(left, right)) {
+         return false;
+      }
+
+      if (left.size() != right.size()) {
+         return false;
+      }
+
+      for (Object leftKey : left.keySet()) {
+         assertEquals(right.get(leftKey), left.get(leftKey));
+      }
+
+      for (Object rightKey : right.keySet()) {
+         assertEquals(left.get(rightKey), right.get(rightKey));
+      }
+
+      return true;
+   }
+
+   @SuppressWarnings("unchecked")
+   private boolean isEquals(Section left, Section right) {
+      if (left == null && right == null) {
+         return true;
+      }
+      if (!isNullnessEquals(left, right)) {
+         return false;
+      }
+
+      assertTrue(left.getClass().equals(right.getClass()));
+
+      if (left instanceof AmqpValue) {
+         AmqpValue leftValue = (AmqpValue) left;
+         AmqpValue rightValue = (AmqpValue) right;
+
+         if (leftValue.getValue() == null && rightValue.getValue() == null) {
+            return true;
+         }
+         if (!isNullnessEquals(leftValue.getValue(), rightValue.getValue())) {
+            return false;
+         }
+
+         assertEquals(leftValue.getValue(), rightValue.getValue());
+      } else if (left instanceof AmqpSequence) {
+         AmqpSequence leftValue = (AmqpSequence) left;
+         AmqpSequence rightValue = (AmqpSequence) right;
+
+         if (leftValue.getValue() == null && rightValue.getValue() == null) {
+            return true;
+         }
+         if (!isNullnessEquals(leftValue.getValue(), rightValue.getValue())) {
+            return false;
+         }
+
+         List<Object> leftList = leftValue.getValue();
+         List<Object> rightList = leftValue.getValue();
+
+         assertEquals(leftList.size(), rightList.size());
+
+         for (int i = 0; i < leftList.size(); ++i) {
+            assertEquals(leftList.get(i), rightList.get(i));
+         }
+      } else if (left instanceof Data) {
+         Data leftValue = (Data) left;
+         Data rightValue = (Data) right;
+
+         if (leftValue.getValue() == null && rightValue.getValue() == null) {
+            return true;
+         }
+         if (!isNullnessEquals(leftValue.getValue(), rightValue.getValue())) {
+            return false;
+         }
+
+         byte[] leftArray = leftValue.getValue().getArray();
+         byte[] rightArray = rightValue.getValue().getArray();
+
+         if (leftArray == null && leftArray == null) {
+            return true;
+         }
+         if (!isNullnessEquals(leftArray, rightArray)) {
+            return false;
+         }
+
+         assertTrue(Arrays.equals(leftArray, rightArray));
+      } else {
+         return false;
+      }
+
+      return true;
+   }
+
+   private boolean isNullnessEquals(Object left, Object right) {
+      if (left == null && right != null) {
+         return false;
+      }
+      if (left != null && right == null) {
+         return false;
+      }
+
+      return true;
+   }
+
+   private ActiveMQBuffer encodeMessageAsPersistedBuffer(MessageImpl message) {
+      ByteBuf nettyBuffer = Unpooled.buffer(1500);
+
+      message.encode(new NettyWritable(nettyBuffer));
+      byte[] bytes = new byte[nettyBuffer.writerIndex() + Integer.BYTES];
+      nettyBuffer.readBytes(bytes, Integer.BYTES, nettyBuffer.readableBytes());
+
+      ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(bytes);
+      buffer.writerIndex(0);
+      buffer.writeInt(bytes.length - Integer.BYTES);
+      buffer.setIndex(0, bytes.length);
+
+      return buffer;
+   }
+
+   private byte[] encodeMessage(MessageImpl message) {
+      ByteBuf nettyBuffer = Unpooled.buffer(1500);
+
+      message.encode(new NettyWritable(nettyBuffer));
+      byte[] bytes = new byte[nettyBuffer.writerIndex()];
+      nettyBuffer.readBytes(bytes);
+
+      return bytes;
+   }
+
+   private AMQPMessage encodeAndDecodeMessage(MessageImpl message) {
+      ByteBuf nettyBuffer = Unpooled.buffer(1500);
+
+      message.encode(new NettyWritable(nettyBuffer));
+      byte[] bytes = new byte[nettyBuffer.writerIndex()];
+      nettyBuffer.readBytes(bytes);
+
+      return new AMQPMessage(0, bytes, null);
+   }
+}