You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/09/26 13:19:51 UTC

[4/5] activemq-artemis git commit: ARTEMIS-2096 Refactor AMQMessage abstraction

ARTEMIS-2096 Refactor AMQMessage abstraction

Major refactoring of the AMQPMessage abstraction to resolve
some issue of message corruption still present in the code and
improve the API handling of message changes and re-encoding.

Improves handling of decoding of message sections limiting the
work to only the portions needed and ensuring the state data
is always updated with what has been done.  Fixes issues of
corrupt state on copy of message or other changes in filters.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a851a8f9
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a851a8f9
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a851a8f9

Branch: refs/heads/master
Commit: a851a8f93f30972d252f2bff0bb3d5847cfd7b5f
Parents: 2453978
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Sep 25 12:22:19 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Sep 26 09:19:40 2018 -0400

----------------------------------------------------------------------
 .../protocol/amqp/broker/AMQPMessage.java       | 1602 +++++++------
 .../amqp/broker/AMQPMessagePersister.java       |    3 -
 .../amqp/converter/AMQPMessageSupport.java      |   31 +
 .../amqp/converter/AmqpCoreConverter.java       |  149 +-
 .../protocol/amqp/proton/AmqpSupport.java       |    1 -
 .../protocol/amqp/util/NettyWritable.java       |    6 +-
 .../protocol/amqp/broker/AMQPMessageTest.java   | 2231 ++++++++++++++++++
 .../amqp/converter/TestConversions.java         |   38 +-
 .../JMSMappingInboundTransformerTest.java       |  124 +-
 .../JMSMappingOutboundTransformerTest.java      |   54 +-
 .../JMSTransformationSpeedComparisonTest.java   |   47 +-
 .../message/MessageTransformationTest.java      |   68 +-
 .../protocol/amqp/message/AMQPMessageTest.java  |  438 ----
 .../protocol/amqp/util/NettyWritableTest.java   |   23 +
 .../amqp/AmqpExpiredMessageTest.java            |   16 +-
 .../integration/journal/MessageJournalTest.java |   22 +-
 16 files changed, 3491 insertions(+), 1362 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a851a8f9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index cff5229..821356a 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.protocol.amqp.broker;
 
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -33,9 +34,9 @@ import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.persistence.Persister;
-import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter;
 import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageIdHelper;
 import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
+import org.apache.activemq.artemis.protocol.amqp.converter.AmqpCoreConverter;
 import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
 import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
 import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
@@ -48,12 +49,18 @@ import org.apache.qpid.proton.amqp.UnsignedByte;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.apache.qpid.proton.amqp.UnsignedLong;
 import org.apache.qpid.proton.amqp.UnsignedShort;
+import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.Data;
 import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Footer;
 import org.apache.qpid.proton.amqp.messaging.Header;
 import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
 import org.apache.qpid.proton.amqp.messaging.Properties;
+import org.apache.qpid.proton.amqp.messaging.Section;
 import org.apache.qpid.proton.codec.DecoderImpl;
+import org.apache.qpid.proton.codec.EncoderImpl;
 import org.apache.qpid.proton.codec.ReadableBuffer;
 import org.apache.qpid.proton.codec.TypeConstructor;
 import org.apache.qpid.proton.codec.WritableBuffer;
@@ -69,111 +76,283 @@ public class AMQPMessage extends RefCountMessage {
 
    public static final SimpleString ADDRESS_PROPERTY = SimpleString.toSimpleString("_AMQ_AD");
 
+   public static final int DEFAULT_MESSAGE_FORMAT = 0;
    public static final int DEFAULT_MESSAGE_PRIORITY = 4;
    public static final int MAX_MESSAGE_PRIORITY = 9;
 
-   final long messageFormat;
-   ReadableBuffer data;
-   boolean bufferValid;
-   Boolean durable;
-   long messageID;
-   SimpleString address;
-   MessageImpl protonMessage;
+   private static final int VALUE_NOT_PRESENT = -1;
+
+   // Buffer and state for the data backing this message.
+   private ReadableBuffer data;
+   private boolean messageDataScanned;
+
+   // Marks the message as needed to be re-encoded to update the backing buffer
+   private boolean modified;
+
+   // Track locations of the message sections for later use and track the size
+   // of the header and delivery annotations if present so we can easily exclude
+   // the delivery annotations later and perform efficient encodes or copies.
+   private int headerPosition = VALUE_NOT_PRESENT;
+   private int encodedHeaderSize;
+   private int deliveryAnnotationsPosition = VALUE_NOT_PRESENT;
+   private int encodedDeliveryAnnotationsSize;
+   private int messageAnnotationsPosition = VALUE_NOT_PRESENT;
+   private int propertiesPosition = VALUE_NOT_PRESENT;
+   private int applicationPropertiesPosition = VALUE_NOT_PRESENT;
+   private int remainingBodyPosition = VALUE_NOT_PRESENT;
+
+   // Message level meta data
+   private final long messageFormat;
+   private long messageID;
+   private SimpleString address;
    private volatile int memoryEstimate = -1;
-   private long expiration = 0;
-
-   // Records where the Header section ends if present.
-   private int headerEnds = 0;
-
-   // Records where the message payload starts, ignoring DeliveryAnnotations if present
-   private int messagePaylodStart = 0;
+   private long expiration;
+   private long scheduledTime = -1;
 
-   private boolean parsedHeaders = false;
-   private Header _header;
-   private DeliveryAnnotations _deliveryAnnotations;
-   private MessageAnnotations _messageAnnotations;
-   private Properties _properties;
-   private int deliveryAnnotationsPosition = -1;
-   private int appLocation = -1;
+   // The Proton based AMQP message section that are retained in memory, these are the
+   // mutable portions of the Message as the broker sees it, although AMQP defines that
+   // the Properties and ApplicationProperties are immutable so care should be taken
+   // here when making changes to those Sections.
+   private Header header;
+   private MessageAnnotations messageAnnotations;
+   private Properties properties;
    private ApplicationProperties applicationProperties;
-   private long scheduledTime = -1;
+
    private String connectionID;
    private final CoreMessageObjectPools coreMessageObjectPools;
+   private Set<Object> rejectedConsumers;
 
-   Set<Object> rejectedConsumers;
-
-   /** These are properties set at the broker level..
-    *  these are properties created by the broker only */
+   // These are properties set at the broker level and carried only internally by broker storage.
    private volatile TypedProperties extraProperties;
 
+   /**
+    * Creates a new {@link AMQPMessage} instance from binary encoded message data.
+    *
+    * @param messageFormat
+    *       The Message format tag given the in Transfer that carried this message
+    * @param data
+    *       The encoded AMQP message
+    * @param extraProperties
+    *       Broker specific extra properties that should be carried with this message
+    */
    public AMQPMessage(long messageFormat, byte[] data, TypedProperties extraProperties) {
       this(messageFormat, data, extraProperties, null);
    }
 
+   /**
+    * Creates a new {@link AMQPMessage} instance from binary encoded message data.
+    *
+    * @param messageFormat
+    *       The Message format tag given the in Transfer that carried this message
+    * @param data
+    *       The encoded AMQP message
+    * @param extraProperties
+    *       Broker specific extra properties that should be carried with this message
+    * @param coreMessageObjectPools
+    *       Object pool used to accelerate some String operations.
+    */
    public AMQPMessage(long messageFormat, byte[] data, TypedProperties extraProperties, CoreMessageObjectPools coreMessageObjectPools) {
-      this(messageFormat, ReadableBuffer.ByteBufferReader.wrap(ByteBuffer.wrap(data)), extraProperties, coreMessageObjectPools);
+      this(messageFormat, ReadableBuffer.ByteBufferReader.wrap(data), extraProperties, coreMessageObjectPools);
    }
 
+   /**
+    * Creates a new {@link AMQPMessage} instance from binary encoded message data.
+    *
+    * @param messageFormat
+    *       The Message format tag given the in Transfer that carried this message
+    * @param data
+    *       The encoded AMQP message in an {@link ReadableBuffer} wrapper.
+    * @param extraProperties
+    *       Broker specific extra properties that should be carried with this message
+    * @param coreMessageObjectPools
+    *       Object pool used to accelerate some String operations.
+    */
    public AMQPMessage(long messageFormat, ReadableBuffer data, TypedProperties extraProperties, CoreMessageObjectPools coreMessageObjectPools) {
       this.data = data;
       this.messageFormat = messageFormat;
-      this.bufferValid = true;
       this.coreMessageObjectPools = coreMessageObjectPools;
       this.extraProperties = extraProperties == null ? null : new TypedProperties(extraProperties);
-      parseHeaders();
+      ensureMessageDataScanned();
    }
 
-   /** for persistence reload */
-   public AMQPMessage(long messageFormat) {
+   /**
+    * Internal constructor used for persistence reload of the message.
+    * <p>
+    * The message will not be usable until the persistence mechanism populates the message
+    * data and triggers a parse of the message contents to fill in the message state.
+    *
+    * @param messageFormat
+    *       The Message format tag given the in Transfer that carried this message
+    */
+   AMQPMessage(long messageFormat) {
       this.messageFormat = messageFormat;
-      this.bufferValid = false;
+      this.modified = true;  // No buffer yet so this indicates invalid state.
       this.coreMessageObjectPools = null;
    }
 
-   public AMQPMessage(long messageFormat, Message message) {
-      this.messageFormat = messageFormat;
-      this.protonMessage = (MessageImpl) message;
-      this.bufferValid = false;
-      this.coreMessageObjectPools = null;
+   // Access to the AMQP message data using safe copies freshly decoded from the current
+   // AMQP message data stored in this message wrapper.  Changes to these values cannot
+   // be used to influence the underlying AMQP message data, the standard AMQPMessage API
+   // must be used to make changes to mutable portions of the message.
+
+   /**
+    * Creates and returns a Proton-J MessageImpl wrapper around the message data. Changes to
+    * the returned Message are not reflected in this message.
+    *
+    * @return a MessageImpl that wraps the AMQP message data in this {@link AMQPMessage}
+    */
+   public MessageImpl getProtonMessage() {
+      ensureMessageDataScanned();
+      ensureDataIsValid();
+
+      MessageImpl protonMessage = null;
+      if (data != null) {
+         protonMessage = (MessageImpl) Message.Factory.create();
+         data.rewind();
+         protonMessage.decode(data.duplicate());
+      }
+
+      return protonMessage;
    }
 
-   public AMQPMessage(Message message) {
-      this(0, message);
+   /**
+    * Returns a copy of the message Header if one is present, changes to the returned
+    * Header instance do not affect the original Message.
+    *
+    * @return a copy of the Message Header if one exists or null if none present.
+    */
+   public Header getHeader() {
+      ensureMessageDataScanned();
+      ensureDataIsValid();
+      return scanForMessageSection(headerPosition, Header.class);
    }
 
-   public MessageImpl getProtonMessage() {
-      if (protonMessage == null) {
-         protonMessage = (MessageImpl) Message.Factory.create();
+   /**
+    * Returns a copy of the MessageAnnotations in the message if present or null.  Changes to the
+    * returned DeliveryAnnotations instance do not affect the original Message.
+    *
+    * @return a copy of the {@link DeliveryAnnotations} present in the message or null if non present.
+    */
+   public DeliveryAnnotations getDeliveryAnnotations() {
+      ensureMessageDataScanned();
+      ensureDataIsValid();
+      return scanForMessageSection(deliveryAnnotationsPosition, DeliveryAnnotations.class);
+   }
 
-         if (data != null) {
-            data.rewind();
-            protonMessage.decode(data.duplicate());
-            this._header = protonMessage.getHeader();
-            protonMessage.setHeader(null);
+   /**
+    * Returns a copy of the DeliveryAnnotations in the message if present or null.  Changes to the
+    * returned MessageAnnotations instance do not affect the original Message.
+    *
+    * @return a copy of the {@link MessageAnnotations} present in the message or null if non present.
+    */
+   public MessageAnnotations getMessageAnnotations() {
+      ensureMessageDataScanned();
+      ensureDataIsValid();
+      return scanForMessageSection(messageAnnotationsPosition, MessageAnnotations.class);
+   }
+
+   /**
+    * Returns a copy of the message Properties if one is present, changes to the returned
+    * Properties instance do not affect the original Message.
+    *
+    * @return a copy of the Message Properties if one exists or null if none present.
+    */
+   public Properties getProperties() {
+      ensureMessageDataScanned();
+      ensureDataIsValid();
+      return scanForMessageSection(propertiesPosition, Properties.class);
+   }
+
+   /**
+    * Returns a copy of the {@link ApplicationProperties} present in the message if present or null.
+    * Changes to the returned MessageAnnotations instance do not affect the original Message.
+    *
+    * @return a copy of the {@link ApplicationProperties} present in the message or null if non present.
+    */
+   public ApplicationProperties getApplicationProperties() {
+      ensureMessageDataScanned();
+      ensureDataIsValid();
+      return scanForMessageSection(applicationPropertiesPosition, ApplicationProperties.class);
+   }
+
+   /**
+    * Retrieves the AMQP Section that composes the body of this message by decoding a
+    * fresh copy from the encoded message data.  Changes to the returned value are not
+    * reflected in the value encoded in the original message.
+    *
+    * @return the Section that makes up the body of this message.
+    */
+   public Section getBody() {
+      ensureMessageDataScanned();
+      ensureDataIsValid();
+
+      // We only handle Sections of AmqpSequence, AmqpValue and Data types so we filter on those.
+      // There could also be a Footer and no body so this will prevent a faulty return type in case
+      // of no body or message type we don't handle.
+      return scanForMessageSection(Math.max(0, remainingBodyPosition), AmqpSequence.class, AmqpValue.class, Data.class);
+   }
+
+   /**
+    * Retrieves the AMQP Footer encoded in the data of this message by decoding a
+    * fresh copy from the encoded message data.  Changes to the returned value are not
+    * reflected in the value encoded in the original message.
+    *
+    * @return the Footer that was encoded into this AMQP Message.
+    */
+   public Footer getFooter() {
+      ensureMessageDataScanned();
+      ensureDataIsValid();
+      return scanForMessageSection(Math.max(0, remainingBodyPosition), Footer.class);
+   }
+
+   @SuppressWarnings({ "unchecked", "rawtypes" })
+   private <T> T scanForMessageSection(int scanStartPosition, Class...targetTypes) {
+      ensureMessageDataScanned();
+
+      // In cases where we parsed out enough to know the value is not encoded in the message
+      // we can exit without doing any reads or buffer hopping.
+      if (scanStartPosition == VALUE_NOT_PRESENT) {
+         return null;
+      }
+
+      ReadableBuffer buffer = data.duplicate().position(0);
+      final DecoderImpl decoder = TLSEncode.getDecoder();
+
+      buffer.position(scanStartPosition);
+
+      T section = null;
+
+      decoder.setBuffer(buffer);
+      try {
+         while (buffer.hasRemaining()) {
+            TypeConstructor<?> constructor = decoder.readConstructor();
+            for (Class<?> type : targetTypes) {
+               if (type.equals(constructor.getTypeClass())) {
+                  section = (T) constructor.readValue();
+                  return section;
+               }
+            }
+
+            constructor.skipValue();
          }
+      } finally {
+         decoder.setBuffer(null);
       }
 
-      return protonMessage;
+      return section;
    }
 
-   private void initalizeObjects() {
-      if (protonMessage == null) {
-         if (data == null) {
-            headerEnds = 0;
-            messagePaylodStart = 0;
-            _header = new Header();
-            _deliveryAnnotations = new DeliveryAnnotations(new HashMap<>());
-            _properties = new Properties();
-            applicationProperties = new ApplicationProperties(new HashMap<>());
-            protonMessage = (MessageImpl) Message.Factory.create();
-            protonMessage.setApplicationProperties(applicationProperties);
-            protonMessage.setDeliveryAnnotations(_deliveryAnnotations);
-         }
+   private ApplicationProperties lazyDecodeApplicationProperties() {
+      if (applicationProperties == null && applicationPropertiesPosition != VALUE_NOT_PRESENT) {
+         applicationProperties = scanForMessageSection(applicationPropertiesPosition, ApplicationProperties.class);
       }
+
+      return applicationProperties;
    }
 
-   private Map<String, Object> getApplicationPropertiesMap() {
-      ApplicationProperties appMap = getApplicationProperties();
+   @SuppressWarnings("unchecked")
+   private Map<String, Object> getApplicationPropertiesMap(boolean createIfAbsent) {
+      ApplicationProperties appMap = lazyDecodeApplicationProperties();
       Map<String, Object> map = null;
 
       if (appMap != null) {
@@ -181,190 +360,359 @@ public class AMQPMessage extends RefCountMessage {
       }
 
       if (map == null) {
-         map = new HashMap<>();
-         this.applicationProperties = new ApplicationProperties(map);
+         if (createIfAbsent) {
+            map = new HashMap<>();
+            this.applicationProperties = new ApplicationProperties(map);
+         } else {
+            map = Collections.EMPTY_MAP;
+         }
       }
 
       return map;
    }
 
-   private ApplicationProperties getApplicationProperties() {
-      parseHeaders();
+   @SuppressWarnings("unchecked")
+   private Map<Symbol, Object> getMessageAnnotationsMap(boolean createIfAbsent) {
+      Map<Symbol, Object> map = null;
 
-      if (applicationProperties == null && appLocation >= 0) {
-         ReadableBuffer buffer = data.duplicate();
-         buffer.position(appLocation);
-         TLSEncode.getDecoder().setBuffer(buffer);
-         Object section = TLSEncode.getDecoder().readObject();
-         applicationProperties = (ApplicationProperties) section;
-         appLocation = -1;
-         TLSEncode.getDecoder().setBuffer(null);
+      if (messageAnnotations != null) {
+         map = messageAnnotations.getValue();
       }
 
-      return applicationProperties;
+      if (map == null) {
+         if (createIfAbsent) {
+            map = new HashMap<>();
+            this.messageAnnotations = new MessageAnnotations(map);
+         } else {
+            map = Collections.EMPTY_MAP;
+         }
+      }
+
+      return map;
    }
 
-   private DeliveryAnnotations getDeliveryAnnotations() {
-      parseHeaders();
+   private Object getMessageAnnotation(String annotation) {
+      return getMessageAnnotation(Symbol.getSymbol(annotation));
+   }
 
-      if (_deliveryAnnotations == null && deliveryAnnotationsPosition >= 0) {
-         ReadableBuffer buffer = data.duplicate();
-         buffer.position(deliveryAnnotationsPosition);
-         TLSEncode.getDecoder().setBuffer(buffer);
-         Object section = TLSEncode.getDecoder().readObject();
-         _deliveryAnnotations = (DeliveryAnnotations) section;
-         deliveryAnnotationsPosition = -1;
-         TLSEncode.getDecoder().setBuffer(null);
-      }
+   private Object getMessageAnnotation(Symbol annotation) {
+      return getMessageAnnotationsMap(false).get(annotation);
+   }
 
-      return _deliveryAnnotations;
+   private Object removeMessageAnnotation(Symbol annotation) {
+      return getMessageAnnotationsMap(false).remove(annotation);
    }
 
-   private synchronized void parseHeaders() {
-      if (!parsedHeaders) {
-         if (data == null) {
-            initalizeObjects();
-         } else {
-            partialDecode(data);
+   private void setMessageAnnotation(String annotation, Object value) {
+      setMessageAnnotation(Symbol.getSymbol(annotation), value);
+   }
+
+   private void setMessageAnnotation(Symbol annotation, Object value) {
+      getMessageAnnotationsMap(true).put(annotation, value);
+   }
+
+   // Message decoding and copying methods.  Care must be taken here to ensure the buffer and the
+   // state tracking information is kept up to data.  When the message is manually changed a forced
+   // re-encode should be done to update the backing data with the in memory elements.
+
+   private synchronized void ensureMessageDataScanned() {
+      if (!messageDataScanned) {
+         scanMessageData();
+         messageDataScanned = true;
+      }
+   }
+
+   private synchronized void scanMessageData() {
+      DecoderImpl decoder = TLSEncode.getDecoder();
+      decoder.setBuffer(data.rewind());
+
+      header = null;
+      messageAnnotations = null;
+      properties = null;
+      applicationProperties = null;
+      expiration = 0;
+      encodedHeaderSize = 0;
+      memoryEstimate = -1;
+      scheduledTime = -1;
+      encodedDeliveryAnnotationsSize = 0;
+      headerPosition = VALUE_NOT_PRESENT;
+      deliveryAnnotationsPosition = VALUE_NOT_PRESENT;
+      propertiesPosition = VALUE_NOT_PRESENT;
+      applicationPropertiesPosition = VALUE_NOT_PRESENT;
+      remainingBodyPosition = VALUE_NOT_PRESENT;
+
+      try {
+         while (data.hasRemaining()) {
+            int constructorPos = data.position();
+            TypeConstructor<?> constructor = decoder.readConstructor();
+            if (Header.class.equals(constructor.getTypeClass())) {
+               header = (Header) constructor.readValue();
+               headerPosition = constructorPos;
+               encodedHeaderSize = data.position();
+               if (header.getTtl() != null) {
+                  expiration = System.currentTimeMillis() + header.getTtl().intValue();
+               }
+            } else if (DeliveryAnnotations.class.equals(constructor.getTypeClass())) {
+               // Don't decode these as they are not used by the broker at all and are
+               // discarded on send, mark for lazy decode if ever needed.
+               constructor.skipValue();
+               deliveryAnnotationsPosition = constructorPos;
+               encodedDeliveryAnnotationsSize = data.position() - constructorPos;
+            } else if (MessageAnnotations.class.equals(constructor.getTypeClass())) {
+               messageAnnotationsPosition = constructorPos;
+               messageAnnotations = (MessageAnnotations) constructor.readValue();
+            } else if (Properties.class.equals(constructor.getTypeClass())) {
+               propertiesPosition = constructorPos;
+               properties = (Properties) constructor.readValue();
+
+               if (properties.getAbsoluteExpiryTime() != null && properties.getAbsoluteExpiryTime().getTime() > 0) {
+                  expiration = properties.getAbsoluteExpiryTime().getTime();
+               }
+            } else if (ApplicationProperties.class.equals(constructor.getTypeClass())) {
+               // Lazy decoding will start at the TypeConstructor of these ApplicationProperties
+               // but we scan past it to grab the location of the possible body and footer section.
+               applicationPropertiesPosition = constructorPos;
+               constructor.skipValue();
+               remainingBodyPosition = data.hasRemaining() ? data.position() : VALUE_NOT_PRESENT;
+               break;
+            } else {
+               // This will be either the body or a Footer section which will be treated as an immutable
+               // and be copied as is when re-encoding the message.
+               remainingBodyPosition = constructorPos;
+               break;
+            }
          }
-         parsedHeaders = true;
+      } finally {
+         decoder.setByteBuffer(null);
+         data.rewind();
       }
    }
 
    @Override
-   public org.apache.activemq.artemis.api.core.Message setConnectionID(String connectionID) {
-      this.connectionID = connectionID;
-      return this;
+   public org.apache.activemq.artemis.api.core.Message copy() {
+      ensureDataIsValid();
+
+      ReadableBuffer view = data.duplicate().rewind();
+      byte[] newData = new byte[view.remaining()];
+
+      // Copy the full message contents with delivery annotations as they will
+      // be trimmed on send and may become useful on the broker at a later time.
+      data.get(newData);
+
+      AMQPMessage newEncode = new AMQPMessage(this.messageFormat, newData, extraProperties, coreMessageObjectPools);
+      newEncode.setMessageID(this.getMessageID());
+      return newEncode;
    }
 
    @Override
-   public String getConnectionID() {
-      return connectionID;
+   public org.apache.activemq.artemis.api.core.Message copy(long newID) {
+      return copy().setMessageID(newID);
    }
 
-   public MessageAnnotations getMessageAnnotations() {
-      parseHeaders();
-      return _messageAnnotations;
-   }
+   // Core Message APIs for persisting and encoding of message data along with
+   // utilities for checking memory usage and encoded size characteristics.
 
-   public Header getHeader() {
-      parseHeaders();
-      return _header;
+   /**
+    * Would be called by the Artemis Core components to encode the message into
+    * the provided send buffer.  Because of how Proton message data handling works
+    * this method is not currently used by the AMQP protocol head and will not be
+    * called for out-bound sends.
+    *
+    * @see #getSendBuffer(int) for the actual method used for message sends.
+    */
+   @Override
+   public void sendBuffer(ByteBuf buffer, int deliveryCount) {
+      ensureDataIsValid();
+      NettyWritable writable = new NettyWritable(buffer);
+      writable.put(getSendBuffer(deliveryCount));
    }
 
-   public Properties getProperties() {
-      parseHeaders();
-      return _properties;
-   }
+   /**
+    * Gets a ByteBuf from the Message that contains the encoded bytes to be sent on the wire.
+    * <p>
+    * When possible this method will present the bytes to the caller without copying them into
+    * a new buffer copy.  If copying is needed a new Netty buffer is created and returned. The
+    * caller should ensure that the reference count on the returned buffer is always decremented
+    * to avoid a leak in the case of a copied buffer being returned.
+    *
+    * @param deliveryCount
+    *       The new delivery count for this message.
+    *
+    * @return a Netty ByteBuf containing the encoded bytes of this Message instance.
+    */
+   public ReadableBuffer getSendBuffer(int deliveryCount) {
+      ensureDataIsValid();
 
-   private Object getSymbol(String symbol) {
-      return getSymbol(Symbol.getSymbol(symbol));
+      if (deliveryCount > 1) {
+         return createCopyWithNewDeliveryCount(deliveryCount);
+      } else if (deliveryAnnotationsPosition != VALUE_NOT_PRESENT) {
+         return createCopyWithoutDeliveryAnnotations();
+      } else {
+         // Common case message has no delivery annotations and this is the first delivery
+         // so no re-encoding or section skipping needed.
+         return data.duplicate();
+      }
    }
 
-   private Object getSymbol(Symbol symbol) {
-      MessageAnnotations annotations = getMessageAnnotations();
-      Map<Symbol, Object> mapAnnotations = annotations != null ? annotations.getValue() : null;
-      if (mapAnnotations != null) {
-         return mapAnnotations.get(symbol);
-      }
+   private ReadableBuffer createCopyWithoutDeliveryAnnotations() {
+      assert deliveryAnnotationsPosition != VALUE_NOT_PRESENT;
 
-      return null;
-   }
+      // The original message had delivery annotations and so we must copy into a new
+      // buffer skipping the delivery annotations section as that is not meant to survive
+      // beyond this hop.
+      ReadableBuffer duplicate = data.duplicate();
 
-   private Object removeSymbol(Symbol symbol) {
-      MessageAnnotations annotations = getMessageAnnotations();
-      Map<Symbol, Object> mapAnnotations = annotations != null ? annotations.getValue() : null;
-      if (mapAnnotations != null) {
-         return mapAnnotations.remove(symbol);
-      }
+      final ByteBuf result = PooledByteBufAllocator.DEFAULT.heapBuffer(getEncodeSize());
+      result.writeBytes(duplicate.limit(encodedHeaderSize).byteBuffer());
+      duplicate.clear();
+      duplicate.position(encodedHeaderSize + encodedDeliveryAnnotationsSize);
+      result.writeBytes(duplicate.byteBuffer());
 
-      return null;
+      return new NettyReadable(result);
    }
 
-   private void setSymbol(String symbol, Object value) {
-      setSymbol(Symbol.getSymbol(symbol), value);
-   }
+   private ReadableBuffer createCopyWithNewDeliveryCount(int deliveryCount) {
+      assert deliveryCount > 1;
 
-   private void setSymbol(Symbol symbol, Object value) {
-      MessageAnnotations annotations = getMessageAnnotations();
-      if (annotations == null) {
-         _messageAnnotations = new MessageAnnotations(new HashMap<>());
-         annotations = _messageAnnotations;
-      }
-      Map<Symbol, Object> mapAnnotations = annotations != null ? annotations.getValue() : null;
-      if (mapAnnotations != null) {
-         mapAnnotations.put(symbol, value);
+      final int amqpDeliveryCount = deliveryCount - 1;
+
+      final ByteBuf result = PooledByteBufAllocator.DEFAULT.heapBuffer(getEncodeSize());
+
+      // If this is re-delivering the message then the header must be re-encoded
+      // otherwise we want to write the original header if present.  When a
+      // Header is present we need to copy it as we are updating the re-delivered
+      // message and not the stored version which we don't want to invalidate here.
+      Header header = this.header;
+      if (header == null) {
+         header = new Header();
+      } else {
+         header = new Header(header);
       }
+
+      header.setDeliveryCount(UnsignedInteger.valueOf(amqpDeliveryCount));
+      TLSEncode.getEncoder().setByteBuffer(new NettyWritable(result));
+      TLSEncode.getEncoder().writeObject(header);
+      TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
+
+      // This will skip any existing delivery annotations that might have been present
+      // in the original message.
+      data.position(encodedHeaderSize + encodedDeliveryAnnotationsSize);
+      result.writeBytes(data.byteBuffer());
+      data.position(0);
+
+      return new NettyReadable(result);
    }
 
    @Override
-   public RoutingType getRoutingType() {
-      Object routingType = getSymbol(AMQPMessageSupport.ROUTING_TYPE);
+   public void messageChanged() {
+      modified = true;
+   }
 
-      if (routingType != null) {
-         return RoutingType.getType((byte) routingType);
+   @Override
+   public ByteBuf getBuffer() {
+      if (data == null) {
+         return null;
       } else {
-         routingType = getSymbol(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION);
-         if (routingType != null) {
-            if (AMQPMessageSupport.QUEUE_TYPE == (byte) routingType || AMQPMessageSupport.TEMP_QUEUE_TYPE == (byte) routingType) {
-               return RoutingType.ANYCAST;
-            } else if (AMQPMessageSupport.TOPIC_TYPE == (byte) routingType || AMQPMessageSupport.TEMP_TOPIC_TYPE == (byte) routingType) {
-               return RoutingType.MULTICAST;
-            }
+         if (data instanceof NettyReadable) {
+            return ((NettyReadable) data).getByteBuf();
          } else {
-            return null;
+            return Unpooled.wrappedBuffer(data.byteBuffer());
          }
+      }
+   }
 
-         return null;
+   @Override
+   public AMQPMessage setBuffer(ByteBuf buffer) {
+      // If this is ever called we would be in a highly unfortunate state
+      this.data = null;
+      return this;
+   }
+
+   @Override
+   public int getEncodeSize() {
+      ensureDataIsValid();
+      // The encoded size will exclude any delivery annotations that are present as we will clip them.
+      return data.remaining() - encodedDeliveryAnnotationsSize;
+   }
+
+   @Override
+   public void receiveBuffer(ByteBuf buffer) {
+      // Not used for AMQP messages.
+   }
+
+   @Override
+   public int getMemoryEstimate() {
+      if (memoryEstimate == -1) {
+         memoryEstimate = memoryOffset + (data != null ? data.capacity() : 0);
       }
+
+      return memoryEstimate;
    }
 
    @Override
-   public org.apache.activemq.artemis.api.core.Message setRoutingType(RoutingType routingType) {
-      parseHeaders();
-      if (routingType == null) {
-         removeSymbol(AMQPMessageSupport.ROUTING_TYPE);
-      } else {
-         setSymbol(AMQPMessageSupport.ROUTING_TYPE, routingType.getType());
+   public ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) {
+      try {
+         return AmqpCoreConverter.toCore(
+            this, coreMessageObjectPools, header, messageAnnotations, properties, lazyDecodeApplicationProperties(), getBody(), getFooter());
+      } catch (Exception e) {
+         throw new RuntimeException(e.getMessage(), e);
       }
-      return this;
    }
 
    @Override
-   public SimpleString getGroupID() {
-      parseHeaders();
+   public ICoreMessage toCore() {
+      return toCore(coreMessageObjectPools);
+   }
 
-      if (_properties != null && _properties.getGroupId() != null) {
-         return SimpleString.toSimpleString(_properties.getGroupId(), coreMessageObjectPools == null ? null : coreMessageObjectPools.getGroupIdStringSimpleStringPool());
+   @Override
+   public void persist(ActiveMQBuffer targetRecord) {
+      ensureDataIsValid();
+      targetRecord.writeInt(internalPersistSize());
+      if (data.hasArray()) {
+         targetRecord.writeBytes(data.array(), data.arrayOffset(), data.remaining());
       } else {
-         return null;
+         targetRecord.writeBytes(data.byteBuffer());
       }
    }
 
    @Override
-   public Long getScheduledDeliveryTime() {
+   public int getPersistSize() {
+      ensureDataIsValid();
+      return DataConstants.SIZE_INT + internalPersistSize();
+   }
 
-      if (scheduledTime < 0) {
-         Object objscheduledTime = getSymbol("x-opt-delivery-time");
-         Object objdelay = getSymbol("x-opt-delivery-delay");
+   private int internalPersistSize() {
+      return data.remaining();
+   }
 
-         if (objscheduledTime != null && objscheduledTime instanceof Number) {
-            this.scheduledTime = ((Number) objscheduledTime).longValue();
-         } else if (objdelay != null && objdelay instanceof Number) {
-            this.scheduledTime = System.currentTimeMillis() + ((Number) objdelay).longValue();
-         } else {
-            this.scheduledTime = 0;
-         }
+   @Override
+   public void reloadPersistence(ActiveMQBuffer record) {
+      int size = record.readInt();
+      byte[] recordArray = new byte[size];
+      record.readBytes(recordArray);
+      data = ReadableBuffer.ByteBufferReader.wrap(ByteBuffer.wrap(recordArray));
+
+      // Message state is now that the underlying buffer is loaded but the contents
+      // not yet scanned, once done the message is fully populated and ready for dispatch.
+      // Force a scan now and tidy the state variables to reflect where we are following
+      // this reload from the store.
+      scanMessageData();
+      messageDataScanned = true;
+      modified = false;
+
+      // Message state should reflect that is came from persistent storage which
+      // can happen when moved to a durable location.  We must re-encode here to
+      // avoid a subsequent redelivery from suddenly appearing with a durable header
+      // tag when the initial delivery did not.
+      if (!isDurable()) {
+         setDurable(true);
+         reencode();
       }
-
-      return scheduledTime;
    }
 
    @Override
-   public AMQPMessage setScheduledDeliveryTime(Long time) {
-      parseHeaders();
-      setSymbol(AMQPMessageSupport.JMS_DELIVERY_TIME, time);
-      return this;
+   public long getPersistentSize() throws ActiveMQException {
+      return getEncodeSize();
    }
 
    @Override
@@ -373,136 +721,143 @@ public class AMQPMessage extends RefCountMessage {
    }
 
    @Override
-   public synchronized boolean acceptsConsumer(long consumer) {
-      if (rejectedConsumers == null) {
-         return true;
-      } else {
-         return !rejectedConsumers.contains(consumer);
-      }
-   }
+   public void reencode() {
+      ensureMessageDataScanned();
 
-   @Override
-   public synchronized void rejectConsumer(long consumer) {
-      if (rejectedConsumers == null) {
-         rejectedConsumers = new HashSet<>();
+      // The address was updated on a message with Properties so we update them
+      // for cases where there are no properties we aren't adding a properties
+      // section which seems wrong but this preserves previous behavior.
+      if (properties != null && address != null) {
+         properties.setTo(address.toString());
       }
 
-      rejectedConsumers.add(consumer);
+      encodeMessage();
+      scanMessageData();
+
+      messageDataScanned = true;
+      modified = false;
    }
 
-   private synchronized void partialDecode(ReadableBuffer buffer) {
-      DecoderImpl decoder = TLSEncode.getDecoder();
-      decoder.setBuffer(buffer.rewind());
+   private synchronized void ensureDataIsValid() {
+      assert data != null;
 
-      _header = null;
-      expiration = 0;
-      headerEnds = 0;
-      messagePaylodStart = 0;
-      _deliveryAnnotations = null;
-      _messageAnnotations = null;
-      _properties = null;
-      applicationProperties = null;
-      appLocation = -1;
-      deliveryAnnotationsPosition = -1;
+      if (modified) {
+         encodeMessage();
+         modified = false;
+      }
+   }
+
+   private synchronized void encodeMessage() {
+      int estimated = Math.max(1500, data != null ? data.capacity() + 1000 : 0);
+      ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(estimated);
+      EncoderImpl encoder = TLSEncode.getEncoder();
 
       try {
-         while (buffer.hasRemaining()) {
-            int constructorPos = buffer.position();
-            TypeConstructor<?> constructor = decoder.readConstructor();
-            if (Header.class.equals(constructor.getTypeClass())) {
-               _header = (Header) constructor.readValue();
-               headerEnds = messagePaylodStart = buffer.position();
-               durable = _header.getDurable();
-               if (_header.getTtl() != null) {
-                  expiration = System.currentTimeMillis() + _header.getTtl().intValue();
-               }
-            } else if (DeliveryAnnotations.class.equals(constructor.getTypeClass())) {
-               // Don't decode these as they are not used by the broker at all and are
-               // discarded on send, mark for lazy decode if ever needed.
-               constructor.skipValue();
-               deliveryAnnotationsPosition = constructorPos;
-               messagePaylodStart = buffer.position();
-            } else if (MessageAnnotations.class.equals(constructor.getTypeClass())) {
-               _messageAnnotations = (MessageAnnotations) constructor.readValue();
-            } else if (Properties.class.equals(constructor.getTypeClass())) {
-               _properties = (Properties) constructor.readValue();
+         NettyWritable writable = new NettyWritable(buffer);
 
-               if (_properties.getAbsoluteExpiryTime() != null && _properties.getAbsoluteExpiryTime().getTime() > 0) {
-                  expiration = _properties.getAbsoluteExpiryTime().getTime();
-               }
+         encoder.setByteBuffer(writable);
+         if (header != null) {
+            encoder.writeObject(header);
+         }
 
-               // Next is either Application Properties or the rest of the message, leave it for
-               // lazy decode of the ApplicationProperties should there be any.  Check first though
-               // as we don't want to actually decode the body which could be expensive.
-               if (buffer.hasRemaining()) {
-                  constructor = decoder.peekConstructor();
-                  if (ApplicationProperties.class.equals(constructor.getTypeClass())) {
-                     appLocation = buffer.position();
-                  }
-               }
-               break;
-            } else if (ApplicationProperties.class.equals(constructor.getTypeClass())) {
-               // Lazy decoding will start at the TypeConstructor of these ApplicationProperties
-               appLocation = constructorPos;
-               break;
-            } else {
-               break;
+         // We currently do not encode any delivery annotations but it is conceivable
+         // that at some point they may need to be preserved, this is where that needs
+         // to happen.
+
+         if (messageAnnotations != null) {
+            encoder.writeObject(messageAnnotations);
+         }
+         if (properties != null) {
+            encoder.writeObject(properties);
+         }
+
+         // Whenever possible avoid encoding sections we don't need to by
+         // checking if application properties where loaded or added and
+         // encoding only in that case.
+         if (applicationProperties != null) {
+            encoder.writeObject(applicationProperties);
+
+            // Now raw write the remainder body and footer if present.
+            if (remainingBodyPosition != VALUE_NOT_PRESENT) {
+               writable.put(data.position(remainingBodyPosition));
             }
+         } else if (applicationPropertiesPosition != VALUE_NOT_PRESENT) {
+            // Writes out ApplicationProperties, Body and Footer in one go if present.
+            writable.put(data.position(applicationPropertiesPosition));
+         } else if (remainingBodyPosition != VALUE_NOT_PRESENT) {
+            // No Application properties at all so raw write Body and Footer sections
+            writable.put(data.position(remainingBodyPosition));
          }
+
+         byte[] bytes = new byte[buffer.writerIndex()];
+
+         buffer.readBytes(bytes);
+         data = ReadableBuffer.ByteBufferReader.wrap(ByteBuffer.wrap(bytes));
       } finally {
-         decoder.setByteBuffer(null);
-         buffer.position(0);
+         encoder.setByteBuffer((WritableBuffer) null);
+         buffer.release();
+      }
+   }
+
+   // These methods interact with the Extra Properties that can accompany the message but
+   // because these are not sent on the wire, update to these does not force a re-encode on
+   // send of the message.
+
+   public TypedProperties createExtraProperties() {
+      if (extraProperties == null) {
+         extraProperties = new TypedProperties();
       }
+      return extraProperties;
    }
 
-   public long getMessageFormat() {
-      return messageFormat;
+   public TypedProperties getExtraProperties() {
+      return extraProperties;
+   }
+
+   public AMQPMessage setExtraProperties(TypedProperties extraProperties) {
+      this.extraProperties = extraProperties;
+      return this;
    }
 
    @Override
-   public void messageChanged() {
-      bufferValid = false;
-      this.data = null;
+   public org.apache.activemq.artemis.api.core.Message putExtraBytesProperty(SimpleString key, byte[] value) {
+      createExtraProperties().putBytesProperty(key, value);
+      return this;
    }
 
    @Override
-   public ByteBuf getBuffer() {
-      if (data == null) {
+   public byte[] getExtraBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+      if (extraProperties == null) {
          return null;
       } else {
-         return Unpooled.wrappedBuffer(data.byteBuffer());
+         return extraProperties.getBytesProperty(key);
       }
    }
 
    @Override
-   public AMQPMessage setBuffer(ByteBuf buffer) {
-      this.data = null;
-      return this;
+   public byte[] removeExtraBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+      if (extraProperties == null) {
+         return null;
+      } else {
+         return (byte[])extraProperties.removeProperty(key);
+      }
    }
 
-   @Override
-   public org.apache.activemq.artemis.api.core.Message copy() {
-      checkBuffer();
-
-      ReadableBuffer view = data.duplicate();
+   // Message meta data access for Core and AMQP usage.
 
-      byte[] newData = new byte[view.remaining() - (messagePaylodStart - headerEnds)];
-
-      view.position(0).limit(headerEnds);
-      view.get(newData, 0, headerEnds);
-      view.clear();
-      view.position(messagePaylodStart);
-      view.get(newData, headerEnds, view.remaining());
-
-      AMQPMessage newEncode = new AMQPMessage(this.messageFormat, newData, extraProperties, coreMessageObjectPools);
-      newEncode.setDurable(isDurable()).setMessageID(this.getMessageID());
-      return newEncode;
+   @Override
+   public org.apache.activemq.artemis.api.core.Message setConnectionID(String connectionID) {
+      this.connectionID = connectionID;
+      return this;
    }
 
    @Override
-   public org.apache.activemq.artemis.api.core.Message copy(long newID) {
-      checkBuffer();
-      return copy().setMessageID(newID);
+   public String getConnectionID() {
+      return connectionID;
+   }
+
+   public long getMessageFormat() {
+      return messageFormat;
    }
 
    @Override
@@ -523,23 +878,31 @@ public class AMQPMessage extends RefCountMessage {
 
    @Override
    public AMQPMessage setExpiration(long expiration) {
-
-      Properties properties = getProperties();
-
       if (properties != null) {
          if (expiration <= 0) {
             properties.setAbsoluteExpiryTime(null);
          } else {
             properties.setAbsoluteExpiryTime(new Date(expiration));
          }
+      } else if (expiration > 0) {
+         properties = new Properties();
+         properties.setAbsoluteExpiryTime(new Date(expiration));
+      }
+
+      // We are overriding expiration with an Absolute expiration time so any
+      // previous Header based TTL also needs to be removed.
+      if (header != null) {
+         header.setTtl(null);
       }
-      this.expiration = expiration;
+
+      this.expiration = Math.max(0, expiration);
+
       return this;
    }
 
    @Override
    public Object getUserID() {
-      Properties properties = getProperties();
+      // User ID in Artemis API means Message ID
       if (properties != null && properties.getMessageId() != null) {
          return properties.getMessageId();
       } else {
@@ -548,14 +911,14 @@ public class AMQPMessage extends RefCountMessage {
    }
 
    /**
-    * Before we added AMQP into Artemis / Hornetq, the name getUserID was already taken by JMSMessageID.
+    * Before we added AMQP into Artemis the name getUserID was already taken by JMSMessageID.
     * We cannot simply change the names now as it would break the API for existing clients.
     *
     * This is to return and read the proper AMQP userID.
-    * @return
+    *
+    * @return the UserID value in the AMQP Properties if one is present.
     */
    public Object getAMQPUserID() {
-      Properties properties = getProperties();
       if (properties != null && properties.getUserId() != null) {
          Binary binary = properties.getUserId();
          return new String(binary.getArray(), binary.getArrayOffset(), binary.getLength(), StandardCharsets.UTF_8);
@@ -570,29 +933,27 @@ public class AMQPMessage extends RefCountMessage {
    }
 
    @Override
-   public boolean isDurable() {
-      if (durable != null) {
-         return durable;
-      }
-
-      parseHeaders();
-
-      if (getHeader() != null && getHeader().getDurable() != null) {
-         durable = getHeader().getDurable();
-         return durable;
-      } else {
-         return durable != null ? durable : false;
-      }
+   public Object getDuplicateProperty() {
+      return null;
    }
 
    @Override
-   public Object getDuplicateProperty() {
-      return null;
+   public boolean isDurable() {
+      if (header != null && header.getDurable() != null) {
+         return header.getDurable();
+      } else {
+         return false;
+      }
    }
 
    @Override
    public org.apache.activemq.artemis.api.core.Message setDurable(boolean durable) {
-      this.durable = durable;
+      if (header == null) {
+         header = new Header();
+      }
+
+      header.setDurable(durable);  // Message needs to be re-encoded following this action.
+
       return this;
    }
 
@@ -602,11 +963,6 @@ public class AMQPMessage extends RefCountMessage {
       return addressSimpleString == null ? null : addressSimpleString.toString();
    }
 
-
-   public SimpleString cachedAddressSimpleString(String address) {
-      return CoreMessageObjectPools.cachedAddressSimpleString(address, coreMessageObjectPools);
-   }
-
    @Override
    public AMQPMessage setAddress(String address) {
       setAddress(cachedAddressSimpleString(address));
@@ -632,7 +988,6 @@ public class AMQPMessage extends RefCountMessage {
 
          if (address == null) {
             // if it still null, it will look for the address on the getTo();
-            Properties properties = getProperties();
             if (properties != null && properties.getTo() != null) {
                address = cachedAddressSimpleString(properties.getTo());
             }
@@ -641,10 +996,14 @@ public class AMQPMessage extends RefCountMessage {
       return address;
    }
 
+   private SimpleString cachedAddressSimpleString(String address) {
+      return CoreMessageObjectPools.cachedAddressSimpleString(address, coreMessageObjectPools);
+   }
+
    @Override
    public long getTimestamp() {
-      if (getProperties() != null && getProperties().getCreationTime() != null) {
-         return getProperties().getCreationTime().getTime();
+      if (properties != null && properties.getCreationTime() != null) {
+         return properties.getCreationTime().getTime();
       } else {
          return 0L;
       }
@@ -652,14 +1011,17 @@ public class AMQPMessage extends RefCountMessage {
 
    @Override
    public org.apache.activemq.artemis.api.core.Message setTimestamp(long timestamp) {
-      getProperties().setCreationTime(new Date(timestamp));
+      if (properties == null) {
+         properties = new Properties();
+      }
+      properties.setCreationTime(new Date(timestamp));
       return this;
    }
 
    @Override
    public byte getPriority() {
-      if (getHeader() != null && getHeader().getPriority() != null) {
-         return (byte) Math.min(getHeader().getPriority().intValue(), MAX_MESSAGE_PRIORITY);
+      if (header != null && header.getPriority() != null) {
+         return (byte) Math.min(header.getPriority().intValue(), MAX_MESSAGE_PRIORITY);
       } else {
          return DEFAULT_MESSAGE_PRIORITY;
       }
@@ -667,349 +1029,180 @@ public class AMQPMessage extends RefCountMessage {
 
    @Override
    public org.apache.activemq.artemis.api.core.Message setPriority(byte priority) {
-      getHeader().setPriority(UnsignedByte.valueOf(priority));
-      return this;
-   }
-
-   @Override
-   public void receiveBuffer(ByteBuf buffer) {
-
-   }
-
-   private synchronized void checkBuffer() {
-      if (!bufferValid) {
-         encodeProtonMessage();
-      }
-   }
-
-   private void encodeProtonMessage() {
-      int estimated = Math.max(1500, data != null ? data.capacity() + 1000 : 0);
-      ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(estimated);
-      try {
-         getProtonMessage().encode(new NettyWritable(buffer));
-         byte[] bytes = new byte[buffer.writerIndex()];
-         buffer.readBytes(bytes);
-         data = ReadableBuffer.ByteBufferReader.wrap(ByteBuffer.wrap(bytes));
-         bufferValid = true;
-      } finally {
-         buffer.release();
-      }
-   }
-
-   @Override
-   public int getEncodeSize() {
-      checkBuffer();
-      // + 20checkBuffer is an estimate for the Header with the deliveryCount
-      return data.remaining() - messagePaylodStart + 20;
-   }
-
-   @Override
-   public void sendBuffer(ByteBuf buffer, int deliveryCount) {
-      checkBuffer();
-
-      int amqpDeliveryCount = deliveryCount - 1;
-
-      // If the re-delivering the message then the header must be re-encoded
-      // otherwise we want to write the original header if present.
-      if (amqpDeliveryCount > 0) {
-
-         Header header = getHeader();
-         if (header == null) {
-            header = new Header();
-            header.setDurable(durable);
-         }
-
-         synchronized (header) {
-            header.setDeliveryCount(UnsignedInteger.valueOf(amqpDeliveryCount));
-            TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buffer));
-            TLSEncode.getEncoder().writeObject(header);
-            TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
-         }
-      } else if (headerEnds > 0) {
-         buffer.writeBytes(data.duplicate().limit(headerEnds).byteBuffer());
-      }
-
-      data.position(messagePaylodStart);
-      buffer.writeBytes(data.byteBuffer());
-      data.position(0);
-   }
-
-   /**
-    * Gets a ByteBuf from the Message that contains the encoded bytes to be sent on the wire.
-    * <p>
-    * When possible this method will present the bytes to the caller without copying them into
-    * another buffer copy.  If copying is needed a new Netty buffer is created and returned. The
-    * caller should ensure that the reference count on the returned buffer is always decremented
-    * to avoid a leak in the case of a copied buffer being returned.
-    *
-    * @param deliveryCount
-    *       The new delivery count for this message.
-    *
-    * @return a Netty ByteBuf containing the encoded bytes of this Message instance.
-    */
-   public ReadableBuffer getSendBuffer(int deliveryCount) {
-      checkBuffer();
-
-      if (deliveryCount > 1) {
-         return createCopyWithNewDeliveryCount(deliveryCount);
-      } else if (headerEnds != messagePaylodStart) {
-         return createCopyWithoutDeliveryAnnotations();
-      } else {
-         // Common case message has no delivery annotations and this is the first delivery
-         // so no re-encoding or section skipping needed.
-         return data.duplicate();
-      }
-   }
-
-   private ReadableBuffer createCopyWithoutDeliveryAnnotations() {
-      assert headerEnds != messagePaylodStart;
-
-      // The original message had delivery annotations and so we must copy into a new
-      // buffer skipping the delivery annotations section as that is not meant to survive
-      // beyond this hop.
-      ReadableBuffer duplicate = data.duplicate();
-
-      final ByteBuf result = PooledByteBufAllocator.DEFAULT.heapBuffer(getEncodeSize());
-      result.writeBytes(duplicate.limit(headerEnds).byteBuffer());
-      duplicate.clear();
-      duplicate.position(messagePaylodStart);
-      result.writeBytes(duplicate.byteBuffer());
-
-      return new NettyReadable(result);
-   }
-
-   private ReadableBuffer createCopyWithNewDeliveryCount(int deliveryCount) {
-      assert deliveryCount > 1;
-
-      final int amqpDeliveryCount = deliveryCount - 1;
-      // If the re-delivering the message then the header must be re-encoded
-      // (or created if not previously present).  Any delivery annotations should
-      // be skipped as well in the resulting buffer.
-
-      final ByteBuf result = PooledByteBufAllocator.DEFAULT.heapBuffer(getEncodeSize());
-
-      Header header = getHeader();
       if (header == null) {
          header = new Header();
-         header.setDurable(durable);
-      }
-
-      synchronized (header) {
-         // Updates or adds a Header section with the correct delivery count
-         header.setDeliveryCount(UnsignedInteger.valueOf(amqpDeliveryCount));
-         TLSEncode.getEncoder().setByteBuffer(new NettyWritable(result));
-         TLSEncode.getEncoder().writeObject(header);
-         TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
-      }
-
-      // This will skip any existing delivery annotations that might have been present
-      // in the original message.
-      data.position(messagePaylodStart);
-      result.writeBytes(data.byteBuffer());
-      data.position(0);
-
-      return new NettyReadable(result);
-   }
-
-   public TypedProperties createExtraProperties() {
-      if (extraProperties == null) {
-         extraProperties = new TypedProperties();
       }
-      return extraProperties;
-   }
-
-   public TypedProperties getExtraProperties() {
-      return extraProperties;
-   }
-
-   public AMQPMessage setExtraProperties(TypedProperties extraProperties) {
-      this.extraProperties = extraProperties;
-      return this;
-   }
-
-   @Override
-   public org.apache.activemq.artemis.api.core.Message putExtraBytesProperty(SimpleString key, byte[] value) {
-      createExtraProperties().putBytesProperty(key, value);
+      header.setPriority(UnsignedByte.valueOf(priority));
       return this;
    }
 
    @Override
-   public byte[] getExtraBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException {
-      if (extraProperties == null) {
-         return null;
+   public SimpleString getReplyTo() {
+      if (properties != null) {
+         return SimpleString.toSimpleString(properties.getReplyTo());
       } else {
-         return extraProperties.getBytesProperty(key);
-      }
-   }
-
-   @Override
-   public byte[] removeExtraBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException {
-      if (extraProperties == null) {
          return null;
-      } else {
-         return (byte[])extraProperties.removeProperty(key);
       }
    }
 
    @Override
-   public org.apache.activemq.artemis.api.core.Message putBooleanProperty(String key, boolean value) {
-      getApplicationPropertiesMap().put(key, Boolean.valueOf(value));
-      return this;
-   }
-
-   @Override
-   public org.apache.activemq.artemis.api.core.Message putByteProperty(String key, byte value) {
-      getApplicationPropertiesMap().put(key, Byte.valueOf(value));
-      return this;
-   }
-
-   @Override
-   public org.apache.activemq.artemis.api.core.Message putBytesProperty(String key, byte[] value) {
-      getApplicationPropertiesMap().put(key, value);
-      return this;
-   }
-
-   @Override
-   public org.apache.activemq.artemis.api.core.Message putShortProperty(String key, short value) {
-      getApplicationPropertiesMap().put(key, Short.valueOf(value));
-      return this;
-   }
+   public AMQPMessage setReplyTo(SimpleString address) {
+      if (properties == null) {
+         properties = new Properties();
+      }
 
-   @Override
-   public org.apache.activemq.artemis.api.core.Message putCharProperty(String key, char value) {
-      getApplicationPropertiesMap().put(key, Character.valueOf(value));
+      properties.setReplyTo(address != null ? address.toString() : null);
       return this;
    }
 
    @Override
-   public org.apache.activemq.artemis.api.core.Message putIntProperty(String key, int value) {
-      getApplicationPropertiesMap().put(key, Integer.valueOf(value));
-      return this;
-   }
+   public RoutingType getRoutingType() {
+      Object routingType = getMessageAnnotation(AMQPMessageSupport.ROUTING_TYPE);
 
-   @Override
-   public org.apache.activemq.artemis.api.core.Message putLongProperty(String key, long value) {
-      getApplicationPropertiesMap().put(key, Long.valueOf(value));
-      return this;
-   }
+      if (routingType != null) {
+         return RoutingType.getType((byte) routingType);
+      } else {
+         routingType = getMessageAnnotation(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION);
+         if (routingType != null) {
+            if (AMQPMessageSupport.QUEUE_TYPE == (byte) routingType || AMQPMessageSupport.TEMP_QUEUE_TYPE == (byte) routingType) {
+               return RoutingType.ANYCAST;
+            } else if (AMQPMessageSupport.TOPIC_TYPE == (byte) routingType || AMQPMessageSupport.TEMP_TOPIC_TYPE == (byte) routingType) {
+               return RoutingType.MULTICAST;
+            }
+         } else {
+            return null;
+         }
 
-   @Override
-   public org.apache.activemq.artemis.api.core.Message putFloatProperty(String key, float value) {
-      getApplicationPropertiesMap().put(key, Float.valueOf(value));
-      return this;
+         return null;
+      }
    }
 
    @Override
-   public org.apache.activemq.artemis.api.core.Message putDoubleProperty(String key, double value) {
-      getApplicationPropertiesMap().put(key, Double.valueOf(value));
+   public org.apache.activemq.artemis.api.core.Message setRoutingType(RoutingType routingType) {
+      if (routingType == null) {
+         removeMessageAnnotation(AMQPMessageSupport.ROUTING_TYPE);
+      } else {
+         setMessageAnnotation(AMQPMessageSupport.ROUTING_TYPE, routingType.getType());
+      }
       return this;
    }
 
    @Override
-   public org.apache.activemq.artemis.api.core.Message putBooleanProperty(SimpleString key, boolean value) {
-      getApplicationPropertiesMap().put(key.toString(), Boolean.valueOf(value));
-      return this;
-   }
+   public SimpleString getGroupID() {
+      ensureMessageDataScanned();
 
-   @Override
-   public org.apache.activemq.artemis.api.core.Message putByteProperty(SimpleString key, byte value) {
-      return putByteProperty(key.toString(), value);
+      if (properties != null && properties.getGroupId() != null) {
+         return SimpleString.toSimpleString(properties.getGroupId(),
+            coreMessageObjectPools == null ? null : coreMessageObjectPools.getGroupIdStringSimpleStringPool());
+      } else {
+         return null;
+      }
    }
 
    @Override
-   public org.apache.activemq.artemis.api.core.Message putBytesProperty(SimpleString key, byte[] value) {
-      return putBytesProperty(key.toString(), value);
-   }
+   public Long getScheduledDeliveryTime() {
+      if (scheduledTime < 0) {
+         Object objscheduledTime = getMessageAnnotation(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME);
+         Object objdelay = getMessageAnnotation(AMQPMessageSupport.SCHEDULED_DELIVERY_DELAY);
 
-   @Override
-   public org.apache.activemq.artemis.api.core.Message putShortProperty(SimpleString key, short value) {
-      return putShortProperty(key.toString(), value);
-   }
+         if (objscheduledTime != null && objscheduledTime instanceof Number) {
+            this.scheduledTime = ((Number) objscheduledTime).longValue();
+         } else if (objdelay != null && objdelay instanceof Number) {
+            this.scheduledTime = System.currentTimeMillis() + ((Number) objdelay).longValue();
+         } else {
+            this.scheduledTime = 0;
+         }
+      }
 
-   @Override
-   public org.apache.activemq.artemis.api.core.Message putCharProperty(SimpleString key, char value) {
-      return putCharProperty(key.toString(), value);
+      return scheduledTime;
    }
 
    @Override
-   public org.apache.activemq.artemis.api.core.Message putIntProperty(SimpleString key, int value) {
-      return putIntProperty(key.toString(), value);
-   }
+   public AMQPMessage setScheduledDeliveryTime(Long time) {
+      if (time != null && time.longValue() > 0) {
+         setMessageAnnotation(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME, time);
+         removeMessageAnnotation(AMQPMessageSupport.SCHEDULED_DELIVERY_DELAY);
+         scheduledTime = time;
+      } else {
+         removeMessageAnnotation(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME);
+         removeMessageAnnotation(AMQPMessageSupport.SCHEDULED_DELIVERY_DELAY);
+         scheduledTime = 0;
+      }
 
-   @Override
-   public org.apache.activemq.artemis.api.core.Message putLongProperty(SimpleString key, long value) {
-      return putLongProperty(key.toString(), value);
+      return this;
    }
 
    @Override
-   public org.apache.activemq.artemis.api.core.Message putFloatProperty(SimpleString key, float value) {
-      return putFloatProperty(key.toString(), value);
+   public Object removeAnnotation(SimpleString key) {
+      return removeMessageAnnotation(Symbol.getSymbol(key.toString()));
    }
 
    @Override
-   public org.apache.activemq.artemis.api.core.Message putDoubleProperty(SimpleString key, double value) {
-      return putDoubleProperty(key.toString(), value);
+   public Object getAnnotation(SimpleString key) {
+      return getMessageAnnotation(key.toString());
    }
 
    @Override
-   public org.apache.activemq.artemis.api.core.Message putStringProperty(String key, String value) {
-      getApplicationPropertiesMap().put(key, value);
+   public AMQPMessage setAnnotation(SimpleString key, Object value) {
+      setMessageAnnotation(key.toString(), value);
       return this;
    }
 
+   // JMS Style property access methods.  These can result in additional decode of AMQP message
+   // data from Application properties.  Updates to application properties puts the message in a
+   // dirty state and requires a re-encode of the data to update all buffer state data otherwise
+   // the next send of the Message will not include changes made here.
+
    @Override
-   public org.apache.activemq.artemis.api.core.Message putObjectProperty(String key,
-                                                                         Object value) throws ActiveMQPropertyConversionException {
-      getApplicationPropertiesMap().put(key, value);
-      return this;
+   public Object removeProperty(SimpleString key) {
+      return removeProperty(key.toString());
    }
 
    @Override
-   public org.apache.activemq.artemis.api.core.Message putObjectProperty(SimpleString key,
-                                                                         Object value) throws ActiveMQPropertyConversionException {
-      return putObjectProperty(key.toString(), value);
+   public Object removeProperty(String key) {
+      return getApplicationPropertiesMap(false).remove(key);
    }
 
    @Override
-   public Object removeProperty(String key) {
-      return getApplicationPropertiesMap().remove(key);
+   public boolean containsProperty(SimpleString key) {
+      return containsProperty(key.toString());
    }
 
    @Override
    public boolean containsProperty(String key) {
-      return getApplicationPropertiesMap().containsKey(key);
+      return getApplicationPropertiesMap(false).containsKey(key);
    }
 
    @Override
    public Boolean getBooleanProperty(String key) throws ActiveMQPropertyConversionException {
-      return (Boolean) getApplicationPropertiesMap().get(key);
+      return (Boolean) getApplicationPropertiesMap(false).get(key);
    }
 
    @Override
    public Byte getByteProperty(String key) throws ActiveMQPropertyConversionException {
-      return (Byte) getApplicationPropertiesMap().get(key);
+      return (Byte) getApplicationPropertiesMap(false).get(key);
    }
 
    @Override
    public Double getDoubleProperty(String key) throws ActiveMQPropertyConversionException {
-      return (Double) getApplicationPropertiesMap().get(key);
+      return (Double) getApplicationPropertiesMap(false).get(key);
    }
 
    @Override
    public Integer getIntProperty(String key) throws ActiveMQPropertyConversionException {
-      return (Integer) getApplicationPropertiesMap().get(key);
+      return (Integer) getApplicationPropertiesMap(false).get(key);
    }
 
    @Override
    public Long getLongProperty(String key) throws ActiveMQPropertyConversionException {
-      return (Long) getApplicationPropertiesMap().get(key);
+      return (Long) getApplicationPropertiesMap(false).get(key);
    }
 
    @Override
    public Object getObjectProperty(String key) {
       if (key.equals(MessageUtil.TYPE_HEADER_NAME.toString())) {
-         if (getProperties() != null) {
-            return getProperties().getSubject();
+         if (properties != null) {
+            return properties.getSubject();
          }
       } else if (key.equals(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString())) {
          return getConnectionID();
@@ -1018,11 +1211,11 @@ public class AMQPMessage extends RefCountMessage {
       } else if (key.equals(MessageUtil.JMSXUSERID)) {
          return getAMQPUserID();
       } else if (key.equals(MessageUtil.CORRELATIONID_HEADER_NAME.toString())) {
-         if (getProperties() != null && getProperties().getCorrelationId() != null) {
-            return AMQPMessageIdHelper.INSTANCE.toCorrelationIdString(getProperties().getCorrelationId());
+         if (properties != null && properties.getCorrelationId() != null) {
+            return AMQPMessageIdHelper.INSTANCE.toCorrelationIdString(properties.getCorrelationId());
          }
       } else {
-         Object value = getApplicationPropertiesMap().get(key);
+         Object value = getApplicationPropertiesMap(false).get(key);
          if (value instanceof UnsignedInteger ||
              value instanceof UnsignedByte ||
              value instanceof UnsignedLong ||
@@ -1038,78 +1231,32 @@ public class AMQPMessage extends RefCountMessage {
 
    @Override
    public Short getShortProperty(String key) throws ActiveMQPropertyConversionException {
-      return (Short) getApplicationPropertiesMap().get(key);
+      return (Short) getApplicationPropertiesMap(false).get(key);
    }
 
    @Override
    public Float getFloatProperty(String key) throws ActiveMQPropertyConversionException {
-      return (Float) getApplicationPropertiesMap().get(key);
+      return (Float) getApplicationPropertiesMap(false).get(key);
    }
 
    @Override
    public String getStringProperty(String key) throws ActiveMQPropertyConversionException {
       if (key.equals(MessageUtil.TYPE_HEADER_NAME.toString())) {
-         return getProperties().getSubject();
+         return properties.getSubject();
       } else if (key.equals(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString())) {
          return getConnectionID();
       } else {
-         return (String) getApplicationPropertiesMap().get(key);
+         return (String) getApplicationPropertiesMap(false).get(key);
       }
    }
 
    @Override
-   public Object removeAnnotation(SimpleString key) {
-      return removeSymbol(Symbol.getSymbol(key.toString()));
-   }
-
-   @Override
-   public Object getAnnotation(SimpleString key) {
-      return getSymbol(key.toString());
-   }
-
-   @Override
-   public AMQPMessage setAnnotation(SimpleString key, Object value) {
-      setSymbol(key.toString(), value);
-      return this;
-   }
-
-   @Override
-   public void reencode() {
-      parseHeaders();
-      getApplicationProperties();
-      getDeliveryAnnotations();
-      if (_header != null) getProtonMessage().setHeader(_header);
-      if (_deliveryAnnotations != null) getProtonMessage().setDeliveryAnnotations(_deliveryAnnotations);
-      if (_messageAnnotations != null) getProtonMessage().setMessageAnnotations(_messageAnnotations);
-      if (applicationProperties != null) getProtonMessage().setApplicationProperties(applicationProperties);
-      if (_properties != null) {
-         if (address != null) {
-            _properties.setTo(address.toString());
-         }
-         getProtonMessage().setProperties(this._properties);
+   public Set<SimpleString> getPropertyNames() {
+      HashSet<SimpleString> values = new HashSet<>();
+      for (Object k : getApplicationPropertiesMap(false).keySet()) {
+         values.add(SimpleString.toSimpleString(k.toString(), getPropertyKeysPool()));
       }
-      bufferValid = false;
-      checkBuffer();
-   }
-
-   @Override
-   public SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException {
-      return SimpleString.toSimpleString((String) getApplicationPropertiesMap().get(key), getPropertyValuesPool());
-   }
-
-   @Override
-   public byte[] getBytesProperty(String key) throws ActiveMQPropertyConversionException {
-      return (byte[]) getApplicationPropertiesMap().get(key);
-   }
-
-   @Override
-   public Object removeProperty(SimpleString key) {
-      return removeProperty(key.toString());
-   }
-
-   @Override
-   public boolean containsProperty(SimpleString key) {
-      return containsProperty(key.toString());
+      return values;
    }
 
    @Override
@@ -1123,6 +1270,11 @@ public class AMQPMessage extends RefCountMessage {
    }
 
    @Override
+   public byte[] getBytesProperty(String key) throws ActiveMQPropertyConversionException {
+      return (byte[]) getApplicationPropertiesMap(false).get(key);
+   }
+
+   @Override
    public Double getDoubleProperty(SimpleString key) throws ActiveMQPropertyConversionException {
       return getDoubleProperty(key.toString());
    }
@@ -1166,107 +1318,150 @@ public class AMQPMessage extends RefCountMessage {
    public byte[] getBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException {
       return getBytesProperty(key.toString());
    }
+   @Override
+   public SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException {
+      return SimpleString.toSimpleString((String) getApplicationPropertiesMap(false).get(key), getPropertyValuesPool());
+   }
+
+   // Core Message Application Property update methods, calling these puts the message in a dirty
+   // state and requires a re-encode of the data to update all buffer state data.  If no re-encode
+   // is done prior to the next dispatch the old view of the message will be sent.
 
    @Override
-   public org.apache.activemq.artemis.api.core.Message putStringProperty(SimpleString key, SimpleString value) {
-      return putStringProperty(key.toString(), value.toString());
+   public org.apache.activemq.artemis.api.core.Message putBooleanProperty(String key, boolean value) {
+      getApplicationPropertiesMap(true).put(key, Boolean.valueOf(value));
+      return this;
    }
 
    @Override
-   public org.apache.activemq.artemis.api.core.Message putStringProperty(SimpleString key, String value) {
-      return putStringProperty(key.toString(), value);
+   public org.apache.activemq.artemis.api.core.Message putByteProperty(String key, byte value) {
+      getApplicationPropertiesMap(true).put(key, Byte.valueOf(value));
+      return this;
    }
 
    @Override
-   public Set<SimpleString> getPropertyNames() {
-      HashSet<SimpleString> values = new HashSet<>();
-      for (Object k : getApplicationPropertiesMap().keySet()) {
-         values.add(SimpleString.toSimpleString(k.toString(), getPropertyKeysPool()));
-      }
-      return values;
+   public org.apache.activemq.artemis.api.core.Message putBytesProperty(String key, byte[] value) {
+      getApplicationPropertiesMap(true).put(key, value);
+      return this;
    }
 
    @Override
-   public int getMemoryEstimate() {
-      if (memoryEstimate == -1) {
-         memoryEstimate = memoryOffset + (data != null ? data.capacity() : 0);
-      }
+   public org.apache.activemq.artemis.api.core.Message putShortProperty(String key, short value) {
+      getApplicationPropertiesMap(true).put(key, Short.valueOf(value));
+      return this;
+   }
 
-      return memoryEstimate;
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putCharProperty(String key, char value) {
+      getApplicationPropertiesMap(true).put(key, Character.valueOf(value));
+      return this;
    }
 
    @Override
-   public ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) {
-      try {
-         return AMQPConverter.getInstance().toCore(this, coreMessageObjectPools);
-      } catch (Exception e) {
-         throw new RuntimeException(e.getMessage(), e);
-      }
+   public org.apache.activemq.artemis.api.core.Message putIntProperty(String key, int value) {
+      getApplicationPropertiesMap(true).put(key, Integer.valueOf(value));
+      return this;
    }
 
    @Override
-   public ICoreMessage toCore() {
-      return toCore(null);
+   public org.apache.activemq.artemis.api.core.Message putLongProperty(String key, long value) {
+      getApplicationPropertiesMap(true).put(key, Long.valueOf(value));
+      return this;
    }
 
    @Override
-   public SimpleString getLastValueProperty() {
-      return getSimpleStringProperty(HDR_LAST_VALUE_NAME);
+   public org.apache.activemq.artemis.api.core.Message putFloatProperty(String key, float value) {
+      getApplicationPropertiesMap(true).put(key, Float.valueOf(value));
+      return this;
    }
 
    @Override
-   public org.apache.activemq.artemis.api.core.Message setLastValueProperty(SimpleString lastValueName) {
-      return putStringProperty(HDR_LAST_VALUE_NAME, lastValueName);
+   public org.apache.activemq.artemis.api.core.Message putDoubleProperty(String key, double value) {
+      getApplicationPropertiesMap(true).put(key, Double.valueOf(value));
+      return this;
    }
 
    @Override
-   public SimpleString getReplyTo() {
-      if (getProperties() != null) {
-         return SimpleString.toSimpleString(getProperties().getReplyTo());
-      } else {
-         return null;
-      }
+   public org.apache.activemq.artemis.api.core.Message putBooleanProperty(SimpleString key, boolean value) {
+      getApplicationPropertiesMap(true).put(key.toString(), Boolean.valueOf(value));
+      return this;
    }
 
    @Override
-   public AMQPMessage setReplyTo(SimpleString address) {
-      if (getProperties() != null) {
-         getProperties().setReplyTo(address != null ? address.toString() : null);
-      }
+   public org.apache.activemq.artemis.api.core.Message putByteProperty(SimpleString key, byte value) {
+      return putByteProperty(key.toString(), value);
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putBytesProperty(SimpleString key, byte[] value) {
+      return putBytesProperty(key.toString(), value);
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putShortProperty(SimpleString key, short value) {
+      return putShortProperty(key.toString(), value);
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putCharProperty(SimpleString key, char value) {
+      return putCharProperty(key.toString(), value);
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putIntProperty(SimpleString key, int value) {
+      return putIntProperty(key.toString(), value);
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putLongProperty(SimpleString key, long value) {
+      return putLongProperty(key.toString(), value);
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putFloatProperty(SimpleString key, float value) {
+      return putFloatProperty(key.toString(), value);
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putDoubleProperty(SimpleString key, double value) {
+      return putDoubleProperty(key.toString(), value);
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putStringProperty(String key, String value) {
+      getApplicationPropertiesMap(true).put(key, value);
       return this;
    }
 
    @Override
-   public int getPersistSize() {
-      checkBuffer();
-      return DataConstants.SIZE_INT + internalPersistSize();
+   public org.apache.activemq.artemis.api.core.Message putObjectProperty(String key, Object value) throws ActiveMQPropertyConversionException {
+      getApplicationPropertiesMap(true).put(key, value);
+      return this;
    }
 
-   private int internalPersistSize() {
-      return data.remaining();
+   @Override
+   public org.apache.activemq.artemis.api.core.Message putObjectProperty(SimpleString key, Object value) throws ActiveMQPropertyConversionException {
+      return putObjectProperty(key.toString(), value);
    }
 
    @Override
-   public void persist(ActiveMQBuffer targetRecord) {
-      checkBuffer();
-      targetRecord.writeInt(internalPersistSize());
-      if (data.hasArray()) {
-         targetRecord.writeBytes(data.array(), data.arrayOffset(), data.remaining());
-      } else {
-         targetRecord.writeBytes(data.byteBuffer());
-      }
+   public org.apache.activemq.artemis.api.core.Message putStringProperty(SimpleString key, SimpleString value) {
+      return putStringProperty(key.toString(), value.toString());
    }
 
    @Override
-   public void reloadPersistence(ActiveMQBuffer record) {
-      int size = record.readInt();
-      byte[] recordArray = new byte[size];
-      record.readBytes(recordArray);
-      this.messagePaylodStart = 0; // whatever was persisted will be sent
-      this.data = ReadableBuffer.ByteBufferReader.wrap(ByteBuffer.wrap(recordArray));
-      this.bufferValid = true;
-      this.durable = true; // it's coming from the journal, so it's durable
-      parseHeaders();
+   public org.apache.activemq.artemis.api.core.Message putStringProperty(SimpleString key, String value) {
+      return putStringProperty(key.toString(), value);
+   }
+
+   @Override
+   public SimpleString getLastValueProperty() {
+      return getSimpleStringProperty(HDR_LAST_VALUE_NAME);
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message setLastValueProperty(SimpleString lastValueName) {
+      return putStringProperty(HDR_LAST_VALUE_NAME, lastValueName);
    }
 
    @Override
@@ -1275,12 +1470,30 @@ public class AMQPMessage extends RefCountMessage {
          ", messageID=" + getMessageID() +
          ", address=" + getAddress() +
          ", size=" + getEncodeSize() +
-         ", applicationProperties=" + getApplicationProperties() +
-         ", properties=" + getProperties() +
+         ", applicationProperties=" + applicationProperties +
+         ", properties=" + properties +
          ", extraProperties = " + getExtraProperties() +
          "]";
    }
 
+   @Override
+   public synchronized boolean acceptsConsumer(long consumer) {
+      if (rejectedConsumers == null) {
+         return true;
+      } else {
+         return !rejectedConsumers.contains(consumer);
+      }
+   }
+
+   @Override
+   public synchronized void rejectConsumer(long consumer) {
+      if (rejectedConsumers == null) {
+         rejectedConsumers = new HashSet<>();
+      }
+
+      rejectedConsumers.add(consumer);
+   }
+
    private SimpleString.StringSimpleStringPool getPropertyKeysPool() {
       return coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesStringSimpleStringPools().getPropertyKeysPool();
    }
@@ -1288,9 +1501,4 @@ public class AMQPMessage extends RefCountMessage {
    private SimpleString.StringSimpleStringPool getPropertyValuesPool() {
       return coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesStringSimpleStringPools().getPropertyValuesPool();
    }
-
-   @Override
-   public long getPersistentSize() throws ActiveMQException {
-      return getEncodeSize();
-   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a851a8f9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java
index bec0beb..c688124 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java
@@ -50,7 +50,6 @@ public class AMQPMessagePersister extends MessagePersister {
          SimpleString.sizeofNullableString(record.getAddressSimpleString()) + DataConstants.SIZE_LONG + DataConstants.SIZE_LONG;
    }
 
-
    /** Sub classes must add the first short as the protocol-id */
    @Override
    public void encode(ActiveMQBuffer buffer, Message record) {
@@ -62,7 +61,6 @@ public class AMQPMessagePersister extends MessagePersister {
       record.persist(buffer);
    }
 
-
    @Override
    public Message decode(ActiveMQBuffer buffer, Message record) {
       long id = buffer.readLong();
@@ -76,5 +74,4 @@ public class AMQPMessagePersister extends MessagePersister {
       }
       return record;
    }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a851a8f9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
index 7c4f425..fc31fc2 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
@@ -63,12 +63,25 @@ public final class AMQPMessageSupport {
 
    /**
     * Attribute used to mark the Application defined delivery time assigned to the message
+    *
+    * @deprecated Use the SCHEDULED_DELIVERY_TIME value as this is not JMS specific and will be removed.
     */
+   @Deprecated
    public static final Symbol JMS_DELIVERY_TIME = Symbol.getSymbol("x-opt-delivery-time");
 
    /**
     * Attribute used to mark the Application defined delivery time assigned to the message
     */
+   public static final Symbol SCHEDULED_DELIVERY_TIME = Symbol.getSymbol("x-opt-delivery-time");
+
+   /**
+    * Attribute used to mark the Application defined delivery time assigned to the message
+    */
+   public static final Symbol SCHEDULED_DELIVERY_DELAY = Symbol.getSymbol("x-opt-delivery-delay");
+
+   /**
+    * Attribute used to mark the Application defined delivery time assigned to the message
+    */
    public static final Symbol ROUTING_TYPE = Symbol.getSymbol("x-opt-routing-type");
 
    /**
@@ -227,6 +240,24 @@ public final class AMQPMessageSupport {
    }
 
    /**
+    * Check whether the content-type given matches the expect value.
+    *
+    * @param expected
+    *        content type string to compare against or null if not expected to be set
+    * @param actual
+    *        the AMQP content type symbol from the Properties section
+    *
+    * @return true if content type matches
+    */
+   public static boolean isContentType(String expected, Symbol actual) {
+      if (expected == null) {
+         return actual == null;
+      } else {
+         return expected.equals(actual != null ? actual.toString() : actual);
+      }
+   }
+
+   /**
     * @param contentType
     *        the contentType of the received message
     * @return the character set to use, or null if not to treat the message as text