You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/03/03 01:05:13 UTC

[20/36] activemq-artemis git commit: ARTEMIS-1009 Pure Message Encoding.

ARTEMIS-1009 Pure Message Encoding.

with this we could send and receive message in their raw format,
without requiring conversions to Core.

- MessageImpl and ServerMessage are removed as part of this
- AMQPMessage and CoreMessage will have the specialized message format for each protocol
- The protocol manager is now responsible to send the message
- The message will provide an encoder for journal and paging


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/64681865
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/64681865
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/64681865

Branch: refs/heads/artemis-1009
Commit: 64681865090a77dc8ea7fe653379d06a67849ab8
Parents: 503b112
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Feb 20 15:55:15 2017 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Mar 2 20:04:30 2017 -0500

----------------------------------------------------------------------
 .../artemis/cli/commands/tools/PrintData.java   |    7 +
 .../cli/commands/tools/XmlDataExporter.java     |   18 +-
 .../cli/commands/tools/XmlDataExporterUtil.java |    7 +-
 .../cli/commands/tools/XmlDataImporter.java     |    3 +-
 .../org/apache/activemq/artemis/Closeable.java  |   22 +
 .../artemis/api/core/ActiveMQBuffer.java        |   13 +
 .../artemis/api/core/ActiveMQBuffers.java       |   15 +
 .../activemq/artemis/api/core/SimpleString.java |   34 +
 .../core/buffers/impl/ChannelBufferWrapper.java |   83 +-
 .../artemis/core/persistence/Persister.java     |   30 +
 .../apache/activemq/artemis/utils/ByteUtil.java |    8 +
 .../activemq/artemis/utils/TypedProperties.java |   74 +-
 .../apache/activemq/artemis/utils/UTF8Util.java |   36 +-
 .../artemis/utils/TypedPropertiesTest.java      |   10 +-
 .../config/ActiveMQDefaultConfiguration.java    |   20 -
 .../activemq/artemis/api/core/Message.java      |  627 +++++-----
 .../artemis/api/core/RefCountMessage.java       |   81 ++
 .../api/core/RefCountMessageListener.java       |   31 +
 .../artemis/api/core/client/ClientMessage.java  |    8 -
 .../artemis/api/core/encode/BodyType.java       |   22 +
 .../artemis/api/core/encode/MessageBody.java    |   28 +
 .../impl/ResetLimitWrappedActiveMQBuffer.java   |   24 +-
 .../client/impl/ClientLargeMessageImpl.java     |   22 +-
 .../core/client/impl/ClientMessageImpl.java     |   74 +-
 .../core/client/impl/ClientMessageInternal.java |    5 +
 .../core/client/impl/ClientProducerImpl.java    |   41 +-
 .../CompressedLargeMessageControllerImpl.java   |    6 +
 .../client/impl/LargeMessageControllerImpl.java |   15 +
 .../artemis/core/message/BodyEncoder.java       |   55 -
 .../artemis/core/message/LargeBodyEncoder.java  |   55 +
 .../artemis/core/message/impl/CoreMessage.java  | 1066 ++++++++++++++++++
 .../core/message/impl/CoreMessagePersister.java |   66 ++
 .../artemis/core/message/impl/MessageImpl.java  | 1059 -----------------
 .../core/message/impl/MessageInternal.java      |   57 -
 .../core/impl/ActiveMQSessionContext.java       |   16 +-
 .../core/protocol/core/impl/ChannelImpl.java    |    1 +
 .../core/protocol/core/impl/PacketImpl.java     |   30 +-
 .../core/impl/RemotingConnectionImpl.java       |    1 +
 .../core/impl/wireformat/MessagePacket.java     |   18 +-
 .../SessionReceiveClientLargeMessage.java       |    5 +-
 .../wireformat/SessionReceiveLargeMessage.java  |   14 +-
 .../impl/wireformat/SessionReceiveMessage.java  |   60 +-
 .../SessionSendContinuationMessage.java         |    8 +-
 .../wireformat/SessionSendLargeMessage.java     |   12 +-
 .../impl/wireformat/SessionSendMessage.java     |   54 +-
 .../activemq/artemis/reader/MapMessageUtil.java |    4 +-
 .../spi/core/remoting/SessionContext.java       |   11 +-
 .../artemis/message/CoreMessageTest.java        |  365 ++++++
 .../jdbc/store/journal/JDBCJournalImpl.java     |   36 +-
 .../jdbc/store/journal/JDBCJournalRecord.java   |    7 +-
 .../jms/client/ActiveMQBytesMessage.java        |    4 +-
 .../artemis/jms/client/ActiveMQMessage.java     |    8 +-
 .../jms/transaction/JMSTransactionDetail.java   |    6 +-
 .../artemis/core/journal/EncoderPersister.java  |   51 +
 .../activemq/artemis/core/journal/Journal.java  |   55 +-
 .../journal/impl/AbstractJournalUpdateTask.java |    3 +-
 .../core/journal/impl/FileWrapperJournal.java   |   26 +-
 .../artemis/core/journal/impl/JournalBase.java  |   63 +-
 .../core/journal/impl/JournalCompactor.java     |    9 +-
 .../artemis/core/journal/impl/JournalImpl.java  |   62 +-
 .../impl/dataformat/JournalAddRecord.java       |   20 +-
 .../impl/dataformat/JournalAddRecordTX.java     |   17 +-
 .../protocol/amqp/broker/AMQPMessage.java       |  761 +++++++++++++
 .../amqp/broker/AMQPMessagePersister.java       |   75 ++
 .../amqp/broker/AMQPSessionCallback.java        |   33 +-
 .../broker/ProtonProtocolManagerFactory.java    |   14 +
 .../amqp/converter/ProtonMessageConverter.java  |   12 +-
 .../converter/jms/ServerJMSBytesMessage.java    |    8 +-
 .../amqp/converter/jms/ServerJMSMapMessage.java |    3 +-
 .../amqp/converter/jms/ServerJMSMessage.java    |   21 +-
 .../converter/jms/ServerJMSObjectMessage.java   |    3 +-
 .../converter/jms/ServerJMSStreamMessage.java   |    5 +-
 .../converter/jms/ServerJMSTextMessage.java     |    3 +-
 .../converter/message/AMQPMessageSupport.java   |   28 +-
 .../message/AMQPNativeInboundTransformer.java   |   44 -
 .../message/AMQPRawInboundTransformer.java      |   62 -
 .../converter/message/InboundTransformer.java   |    5 +-
 .../message/JMSMappingInboundTransformer.java   |   34 +-
 .../message/JMSMappingOutboundTransformer.java  |    3 +-
 .../amqp/proton/AMQPConnectionContext.java      |    4 +
 .../proton/ProtonServerReceiverContext.java     |   32 +-
 .../amqp/proton/ProtonServerSenderContext.java  |   25 +-
 .../amqp/proton/ProtonTransactionHandler.java   |    3 +-
 .../amqp/proton/handler/ProtonHandler.java      |    2 +-
 .../protocol/amqp/util/DeliveryUtil.java        |   13 -
 .../protocol/amqp/util/NettyReadable.java       |  139 +++
 .../amqp/converter/TestConversions.java         |   62 +-
 .../JMSMappingInboundTransformerTest.java       |    6 +-
 .../JMSMappingOutboundTransformerTest.java      |   38 +-
 .../JMSTransformationSpeedComparisonTest.java   |   36 +-
 .../message/MessageTransformationTest.java      |   19 +-
 .../protocol/amqp/message/AMQPMessageTest.java  |   63 ++
 .../core/protocol/mqtt/MQTTPublishManager.java  |   22 +-
 .../protocol/mqtt/MQTTRetainMessageManager.java |    8 +-
 .../core/protocol/mqtt/MQTTSessionCallback.java |   10 +-
 .../artemis/core/protocol/mqtt/MQTTUtil.java    |   15 +-
 .../openwire/OpenWireMessageConverter.java      |   20 +-
 .../core/protocol/openwire/amq/AMQConsumer.java |    6 +-
 .../core/protocol/openwire/amq/AMQSession.java  |    9 +-
 .../protocol/openwire/util/OpenWireUtil.java    |    7 +-
 .../ActiveMQStompProtocolMessageBundle.java     |    3 +-
 .../core/protocol/stomp/StompConnection.java    |   14 +-
 .../protocol/stomp/StompProtocolManager.java    |    6 +-
 .../core/protocol/stomp/StompSession.java       |   35 +-
 .../artemis/core/protocol/stomp/StompUtils.java |    6 +-
 .../stomp/VersionedStompFrameHandler.java       |   19 +-
 .../stomp/v12/StompFrameHandlerV12.java         |    4 +-
 .../artemis/core/config/Configuration.java      |    8 -
 .../core/config/impl/ConfigurationImpl.java     |   32 -
 .../deployers/impl/FileConfigurationParser.java |    4 -
 .../activemq/artemis/core/filter/Filter.java    |    4 +-
 .../artemis/core/filter/impl/FilterImpl.java    |   12 +-
 .../management/impl/AddressControlImpl.java     |    6 +-
 .../core/management/impl/QueueControlImpl.java  |   10 +-
 .../impl/openmbean/OpenTypeSupport.java         |   16 +-
 .../artemis/core/paging/PagedMessage.java       |    5 +-
 .../artemis/core/paging/PagingStore.java        |    8 +-
 .../core/paging/cursor/PagedReferenceImpl.java  |   16 +-
 .../cursor/impl/PageSubscriptionImpl.java       |    4 +-
 .../activemq/artemis/core/paging/impl/Page.java |    2 +-
 .../core/paging/impl/PagedMessageImpl.java      |   66 +-
 .../core/paging/impl/PagingStoreImpl.java       |   52 +-
 .../core/persistence/StorageManager.java        |   16 +-
 .../journal/AbstractJournalStorageManager.java  |   60 +-
 .../impl/journal/AddMessageRecord.java          |    8 +-
 .../impl/journal/DescribeJournal.java           |   17 +-
 .../impl/journal/JournalRecordIds.java          |    3 +
 .../impl/journal/JournalStorageManager.java     |   14 +-
 .../journal/LargeMessageTXFailureCallback.java  |    6 +-
 .../impl/journal/LargeServerMessageImpl.java    |  117 +-
 .../journal/LargeServerMessagePersister.java    |   73 ++
 .../journal/codec/LargeMessageEncoding.java     |   55 -
 .../journal/codec/LargeMessagePersister.java    |   63 ++
 .../nullpm/NullStorageLargeServerMessage.java   |   18 +-
 .../impl/nullpm/NullStorageManager.java         |   16 +-
 .../artemis/core/postoffice/Binding.java        |    9 +-
 .../artemis/core/postoffice/Bindings.java       |    6 +-
 .../artemis/core/postoffice/PostOffice.java     |   18 +-
 .../core/postoffice/impl/BindingsImpl.java      |   27 +-
 .../core/postoffice/impl/DivertBinding.java     |    8 +-
 .../core/postoffice/impl/LocalQueueBinding.java |    8 +-
 .../core/postoffice/impl/PostOfficeImpl.java    |   84 +-
 .../core/protocol/ServerPacketDecoder.java      |    6 +-
 .../core/ServerSessionPacketHandler.java        |   82 +-
 .../core/impl/ActiveMQPacketHandler.java        |    2 +-
 .../core/impl/CoreProtocolManagerFactory.java   |   14 +
 .../protocol/core/impl/CoreSessionCallback.java |    8 +-
 .../impl/wireformat/ReplicationAddMessage.java  |   14 +-
 .../wireformat/ReplicationAddTXMessage.java     |   13 +-
 .../wireformat/ReplicationPageWriteMessage.java |    2 +-
 .../core/remoting/server/RemotingService.java   |    4 +
 .../server/impl/RemotingServiceImpl.java        |   11 +-
 .../core/replication/ReplicatedJournal.java     |   52 +-
 .../core/replication/ReplicationEndpoint.java   |    7 +-
 .../core/replication/ReplicationManager.java    |   11 +-
 .../core/server/ActiveMQServerLogger.java       |    9 +-
 .../activemq/artemis/core/server/Bindable.java  |    6 +-
 .../artemis/core/server/LargeServerMessage.java |    3 +-
 .../artemis/core/server/MessageReference.java   |   10 +-
 .../activemq/artemis/core/server/Queue.java     |    3 +-
 .../artemis/core/server/ServerMessage.java      |   78 --
 .../artemis/core/server/ServerSession.java      |   22 +-
 .../core/server/cluster/Transformer.java        |    4 +-
 .../core/server/cluster/impl/BridgeImpl.java    |   17 +-
 .../cluster/impl/ClusterConnectionBridge.java   |   14 +-
 .../core/server/cluster/impl/Redistributor.java |    3 +-
 .../cluster/impl/RemoteQueueBindingImpl.java    |   14 +-
 .../core/server/impl/ActiveMQServerImpl.java    |    1 +
 .../artemis/core/server/impl/DivertImpl.java    |   10 +-
 .../artemis/core/server/impl/JournalLoader.java |    6 +-
 .../core/server/impl/LastValueQueue.java        |   10 +-
 .../core/server/impl/MessageReferenceImpl.java  |   24 +-
 .../server/impl/PostOfficeJournalLoader.java    |    7 +-
 .../artemis/core/server/impl/QueueImpl.java     |   65 +-
 .../artemis/core/server/impl/RefsOperation.java |    4 +-
 .../core/server/impl/ScaleDownHandler.java      |   45 +-
 .../core/server/impl/ServerConsumerImpl.java    |   27 +-
 .../core/server/impl/ServerMessageImpl.java     |  341 ------
 .../core/server/impl/ServerSessionImpl.java     |  118 +-
 .../server/management/ManagementService.java    |    5 +-
 .../management/impl/ManagementServiceImpl.java  |   12 +-
 .../core/transaction/TransactionDetail.java     |    9 +-
 .../transaction/impl/CoreTransactionDetail.java |    6 +-
 .../spi/core/protocol/MessageConverter.java     |    7 +-
 .../spi/core/protocol/MessagePersister.java     |   88 ++
 .../spi/core/protocol/ProtocolManager.java      |    2 +
 .../core/protocol/ProtocolManagerFactory.java   |   15 +
 .../spi/core/protocol/SessionCallback.java      |    6 +-
 .../resources/schema/artemis-configuration.xsd  |   16 -
 .../core/config/impl/ConfigurationImplTest.java |    9 -
 .../artemis/core/filter/impl/FilterTest.java    |   12 +-
 .../group/impl/ClusteredResetMockTest.java      |    5 +-
 .../impl/ScheduledDeliveryHandlerTest.java      |  196 ++--
 .../transaction/impl/TransactionImplTest.java   |   16 +-
 .../artemis/tests/util/ActiveMQTestBase.java    |   13 +-
 .../resources/ConfigurationTest-full-config.xml |    2 -
 .../test/resources/artemis-configuration.xsd    |   16 -
 .../jms/example/HatColourChangeTransformer.java |    2 +-
 .../example/AddForwardingTimeTransformer.java   |    2 +-
 pom.xml                                         |    3 +-
 .../amqp/client/util/UnmodifiableDelivery.java  |    5 +
 .../journal/gcfree/EncodersBench.java           |    5 +-
 .../byteman/JMSBridgeReconnectionTest.java      |    4 +-
 .../tests/extras/byteman/MessageCopyTest.java   |  163 ---
 .../integration/DuplicateDetectionTest.java     |    6 +-
 .../tests/integration/amqp/ProtonTest.java      |   22 +-
 .../integration/client/AckBatchSizeTest.java    |   14 +-
 .../integration/client/AcknowledgeTest.java     |  177 ++-
 .../tests/integration/client/ConsumerTest.java  |  163 ++-
 .../integration/client/HangConsumerTest.java    |    7 +-
 .../InVMNonPersistentMessageBufferTest.java     |   36 +-
 .../client/InterruptedLargeMessageTest.java     |   10 +-
 .../integration/client/LargeMessageTest.java    |    4 +-
 .../integration/cluster/bridge/BridgeTest.java  |   10 +-
 .../cluster/bridge/SimpleTransformer.java       |   61 +-
 .../distribution/ClusterHeadersRemovedTest.java |    5 +-
 .../distribution/MessageRedistributionTest.java |    4 +-
 .../tests/integration/divert/DivertTest.java    |    5 +-
 .../interceptors/InterceptorTest.java           |    8 +-
 .../integration/journal/MessageJournalTest.java |  133 +++
 .../journal/NIOJournalCompactTest.java          |    6 +-
 .../integration/karaf/ContainerBaseTest.java    |   64 ++
 .../tests/integration/karaf/KarafBaseTest.java  |  212 ++++
 .../karaf/distribution/ArtemisFeatureTest.java  |  101 ++
 .../karaf/distribution/package-info.java        |   21 +
 .../karaf/version/ProbeRemoteServer.java        |   51 +
 .../integration/karaf/version/RemoteTest.java   |   38 +
 .../karaf/version/VersionWireTest.java          |  104 ++
 .../integration/karaf/version/package-info.java |   21 +
 .../management/ManagementServiceImplTest.java   |   24 +-
 .../DeleteMessagesOnStartupTest.java            |   10 +-
 .../replication/ReplicationTest.java            |   71 +-
 .../integration/server/FakeStorageManager.java  |    6 +-
 .../storage/PersistMultiThreadTest.java         |   31 +-
 .../stress/paging/PageCursorStressTest.java     |   24 +-
 .../core/server/impl/QueueConcurrentTest.java   |    6 +-
 tests/unit-tests/pom.xml                        |    6 +
 .../core/journal/impl/JournalImplTestUnit.java  |    2 +-
 .../unit/core/message/impl/MessageImplTest.java |    9 +-
 .../tests/unit/core/paging/impl/PageTest.java   |   42 +-
 .../core/paging/impl/PagingManagerImplTest.java |   16 +-
 .../core/paging/impl/PagingStoreImplTest.java   |   38 +-
 .../core/postoffice/impl/BindingsImplTest.java  |   16 +-
 .../unit/core/postoffice/impl/FakeQueue.java    |    9 +-
 .../impl/WildcardAddressManagerUnitTest.java    |   12 +-
 .../unit/core/server/impl/QueueImplTest.java    |    4 +-
 .../unit/core/server/impl/fakes/FakeFilter.java |    7 +-
 .../server/impl/fakes/FakeJournalLoader.java    |    6 +-
 .../core/server/impl/fakes/FakePostOffice.java  |   22 +-
 .../tests/unit/util/FakePagingManager.java      |    7 +-
 .../artemis/tests/unit/util/MemorySizeTest.java |    4 +-
 .../artemis/tests/unit/util/UTF8Test.java       |   10 +-
 252 files changed, 6442 insertions(+), 4208 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
index 408aef5..2816aaf 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
@@ -34,6 +34,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.cli.Artemis;
 import org.apache.activemq.artemis.cli.commands.ActionContext;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.paging.PagingManager;
 import org.apache.activemq.artemis.core.paging.PagingStore;
@@ -50,16 +51,22 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordId
 import org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAckRecordEncoding;
 import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageUpdateTXEncoding;
 import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
+import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFactory;
 import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
+import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
 import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
 
 @Command(name = "print", description = "Print data records information (WARNING: don't use while a production server is running)")
 public class PrintData extends OptionalLocking {
 
+   static {
+      MessagePersister.registerPersister(CoreProtocolManagerFactory.ID, CoreMessagePersister.getInstance());
+   }
+
    @Override
    public Object execute(ActionContext context) throws Exception {
       super.execute(context);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java
index 4f99181..b57b5c5 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java
@@ -50,7 +50,7 @@ import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
 import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
 import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
-import org.apache.activemq.artemis.core.message.BodyEncoder;
+import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.paging.PagingManager;
 import org.apache.activemq.artemis.core.paging.PagingStore;
@@ -75,7 +75,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.JournalType;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.core.server.ServerMessage;
+
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
@@ -367,7 +367,7 @@ public final class XmlDataExporter extends OptionalLocking {
       // Order here is important.  We must process the messages from the journal before we process those from the page
       // files in order to get the messages in the right order.
       for (Map.Entry<Long, Message> messageMapEntry : messages.entrySet()) {
-         printSingleMessageAsXML((ServerMessage) messageMapEntry.getValue(), extractQueueNames(messageRefs.get(messageMapEntry.getKey())));
+         printSingleMessageAsXML(messageMapEntry.getValue(), extractQueueNames(messageRefs.get(messageMapEntry.getKey())));
       }
 
       printPagedMessagesAsXML();
@@ -381,6 +381,8 @@ public final class XmlDataExporter extends OptionalLocking {
     */
    private void printPagedMessagesAsXML() {
       try {
+
+         // TODO-now: fix encodings
          ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory());
          final ExecutorService executor = Executors.newFixedThreadPool(10, ActiveMQThreadFactory.defaultThreadFactory());
          ExecutorFactory executorFactory = new ExecutorFactory() {
@@ -456,7 +458,7 @@ public final class XmlDataExporter extends OptionalLocking {
       }
    }
 
-   private void printSingleMessageAsXML(ServerMessage message, List<String> queues) throws XMLStreamException {
+   private void printSingleMessageAsXML(Message message, List<String> queues) throws XMLStreamException {
       xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_CHILD);
       printMessageAttributes(message);
       printMessageProperties(message);
@@ -466,7 +468,7 @@ public final class XmlDataExporter extends OptionalLocking {
       messagesPrinted++;
    }
 
-   private void printMessageBody(ServerMessage message) throws XMLStreamException {
+   private void printMessageBody(Message message) throws XMLStreamException {
       xmlWriter.writeStartElement(XmlDataConstants.MESSAGE_BODY);
 
       if (message.isLargeMessage()) {
@@ -479,7 +481,7 @@ public final class XmlDataExporter extends OptionalLocking {
 
    private void printLargeMessageBody(LargeServerMessage message) throws XMLStreamException {
       xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_IS_LARGE, Boolean.TRUE.toString());
-      BodyEncoder encoder = null;
+      LargeBodyEncoder encoder = null;
 
       try {
          encoder = message.getBodyEncoder();
@@ -522,7 +524,7 @@ public final class XmlDataExporter extends OptionalLocking {
       xmlWriter.writeEndElement(); // end QUEUES_PARENT
    }
 
-   private void printMessageProperties(ServerMessage message) throws XMLStreamException {
+   private void printMessageProperties(Message message) throws XMLStreamException {
       xmlWriter.writeStartElement(XmlDataConstants.PROPERTIES_PARENT);
       for (SimpleString key : message.getPropertyNames()) {
          Object value = message.getObjectProperty(key);
@@ -539,7 +541,7 @@ public final class XmlDataExporter extends OptionalLocking {
       xmlWriter.writeEndElement(); // end PROPERTIES_PARENT
    }
 
-   private void printMessageAttributes(ServerMessage message) throws XMLStreamException {
+   private void printMessageAttributes(Message message) throws XMLStreamException {
       xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_ID, Long.toString(message.getMessageID()));
       xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_PRIORITY, Byte.toString(message.getPriority()));
       xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_EXPIRATION, Long.toString(message.getExpiration()));

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporterUtil.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporterUtil.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporterUtil.java
index 8ee7678..a3807bd 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporterUtil.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporterUtil.java
@@ -17,10 +17,9 @@
 package org.apache.activemq.artemis.cli.commands.tools;
 
 import com.google.common.base.Preconditions;
-
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.utils.Base64;
 
 /**
@@ -92,10 +91,10 @@ public class XmlDataExporterUtil {
     * @param message
     * @return
     */
-   public static String encodeMessageBody(final ServerMessage message) {
+   public static String encodeMessageBody(final Message message) {
       Preconditions.checkNotNull(message, "ServerMessage can not be null");
 
-      int size = message.getEndOfBodyPosition() - message.getBodyBuffer().readerIndex();
+      int size = ((CoreMessage)message.toCore()).getEndOfBodyPosition() - message.getBodyBuffer().readerIndex();
       byte[] buffer = new byte[size];
       message.getBodyBuffer().readBytes(buffer);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java
index 8e2bb9f..0f06738 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java
@@ -59,7 +59,6 @@ import org.apache.activemq.artemis.api.core.management.ManagementHelper;
 import org.apache.activemq.artemis.api.core.management.ResourceNames;
 import org.apache.activemq.artemis.cli.commands.ActionAbstract;
 import org.apache.activemq.artemis.cli.commands.ActionContext;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
@@ -387,7 +386,7 @@ public final class XmlDataImporter extends ActionAbstract {
          logger.debug(logMessage);
       }
 
-      message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array());
+      message.putBytesProperty(Message.HDR_ROUTE_TO_IDS, buffer.array());
       try (ClientProducer producer = session.createProducer(destination)) {
          producer.send(message);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-commons/src/main/java/org/apache/activemq/artemis/Closeable.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/Closeable.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/Closeable.java
new file mode 100644
index 0000000..2f00c5d
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/Closeable.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis;
+
+public interface Closeable {
+   void close(boolean failed);
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java
index 5446f3f..3a208a6 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java
@@ -1065,6 +1065,19 @@ public interface ActiveMQBuffer extends DataInput {
     */
    void writeBytes(ByteBuffer src);
 
+
+   /**
+    * Transfers the specified source buffer's data to this buffer starting at
+    * the current {@code writerIndex} until the source buffer's position
+    * reaches its limit, and increases the {@code writerIndex} by the
+    * number of the transferred bytes.
+    *
+    * @param src The source buffer
+    * @throws IndexOutOfBoundsException if {@code src.remaining()} is greater than
+    *                                   {@code this.writableBytes}
+    */
+   void writeBytes(ByteBuf src, int srcIndex, int length);
+
    /**
     * Returns a copy of this buffer's readable bytes.  Modifying the content
     * of the returned buffer or this buffer does not affect each other at all.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffers.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffers.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffers.java
index 32f9279..25fcfea 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffers.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffers.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.api.core;
 
 import java.nio.ByteBuffer;
 
+import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
 import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
@@ -76,6 +77,20 @@ public final class ActiveMQBuffers {
    }
 
    /**
+    * Creates an ActiveMQBuffer wrapping an underlying ByteBuf
+    *
+    * The position on this buffer won't affect the position on the inner buffer
+    *
+    * @param underlying the underlying NIO ByteBuffer
+    * @return an ActiveMQBuffer wrapping the underlying NIO ByteBuffer
+    */
+   public static ActiveMQBuffer wrappedBuffer(final ByteBuf underlying) {
+      ActiveMQBuffer buff = new ChannelBufferWrapper(underlying.duplicate());
+
+      return buff;
+   }
+
+   /**
     * Creates an ActiveMQBuffer wrapping an underlying byte array
     *
     * @param underlying the underlying byte array

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
index b7f70c6..e8530e6 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
@@ -20,6 +20,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
+import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.utils.DataConstants;
 
 /**
@@ -134,6 +135,39 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
    }
 
 
+   public static SimpleString readNullableSimpleString(ByteBuf buffer) {
+      int b = buffer.readByte();
+      if (b == DataConstants.NULL) {
+         return null;
+      }
+      return readSimpleString(buffer);
+   }
+
+
+   public static SimpleString readSimpleString(ByteBuf buffer) {
+      int len = buffer.readInt();
+      byte[] data = new byte[len];
+      buffer.readBytes(data);
+      return new SimpleString(data);
+   }
+
+   public static void writeNullableSimpleString(ByteBuf buffer, SimpleString val) {
+      if (val == null) {
+         buffer.writeByte(DataConstants.NULL);
+      } else {
+         buffer.writeByte(DataConstants.NOT_NULL);
+         writeSimpleString(buffer, val);
+      }
+   }
+
+   public static void writeSimpleString(ByteBuf buffer, SimpleString val) {
+      byte[] data = val.getData();
+      buffer.writeInt(data.length);
+      buffer.writeBytes(data);
+   }
+
+
+
    public SimpleString subSeq(final int start, final int end) {
       int len = data.length >> 1;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java
index 690dbd7..b2660fa 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java
@@ -66,11 +66,7 @@ public class ChannelBufferWrapper implements ActiveMQBuffer {
 
    @Override
    public SimpleString readNullableSimpleString() {
-      int b = buffer.readByte();
-      if (b == DataConstants.NULL) {
-         return null;
-      }
-      return readSimpleStringInternal();
+      return SimpleString.readNullableSimpleString(buffer);
    }
 
    @Override
@@ -84,14 +80,7 @@ public class ChannelBufferWrapper implements ActiveMQBuffer {
 
    @Override
    public SimpleString readSimpleString() {
-      return readSimpleStringInternal();
-   }
-
-   private SimpleString readSimpleStringInternal() {
-      int len = buffer.readInt();
-      byte[] data = new byte[len];
-      buffer.readBytes(data);
-      return new SimpleString(data);
+      return SimpleString.readSimpleString(buffer);
    }
 
    @Override
@@ -111,11 +100,21 @@ public class ChannelBufferWrapper implements ActiveMQBuffer {
       } else if (len < 0xfff) {
          return readUTF();
       } else {
-         return readSimpleStringInternal().toString();
+         return SimpleString.readNullableSimpleString(buffer).toString();
       }
    }
 
    @Override
+   public void writeNullableString(String val) {
+      UTF8Util.writeNullableString(buffer, val);
+   }
+
+   @Override
+   public void writeUTF(String utf) {
+      UTF8Util.saveUTF(buffer, utf);
+   }
+
+   @Override
    public String readUTF() {
       return UTF8Util.readUTF(this);
    }
@@ -127,62 +126,17 @@ public class ChannelBufferWrapper implements ActiveMQBuffer {
 
    @Override
    public void writeNullableSimpleString(final SimpleString val) {
-      if (val == null) {
-         buffer.writeByte(DataConstants.NULL);
-      } else {
-         buffer.writeByte(DataConstants.NOT_NULL);
-         writeSimpleStringInternal(val);
-      }
-   }
-
-   @Override
-   public void writeNullableString(final String val) {
-      if (val == null) {
-         buffer.writeByte(DataConstants.NULL);
-      } else {
-         buffer.writeByte(DataConstants.NOT_NULL);
-         writeStringInternal(val);
-      }
+      SimpleString.writeNullableSimpleString(buffer, val);
    }
 
    @Override
    public void writeSimpleString(final SimpleString val) {
-      writeSimpleStringInternal(val);
-   }
-
-   private void writeSimpleStringInternal(final SimpleString val) {
-      byte[] data = val.getData();
-      buffer.writeInt(data.length);
-      buffer.writeBytes(data);
+      SimpleString.writeSimpleString(buffer, val);
    }
 
    @Override
    public void writeString(final String val) {
-      writeStringInternal(val);
-   }
-
-   private void writeStringInternal(final String val) {
-      int length = val.length();
-
-      buffer.writeInt(length);
-
-      if (length < 9) {
-         // If very small it's more performant to store char by char
-         for (int i = 0; i < val.length(); i++) {
-            buffer.writeShort((short) val.charAt(i));
-         }
-      } else if (length < 0xfff) {
-         // Store as UTF - this is quicker than char by char for most strings
-         writeUTF(val);
-      } else {
-         // Store as SimpleString, since can't store utf > 0xffff in length
-         writeSimpleStringInternal(new SimpleString(val));
-      }
-   }
-
-   @Override
-   public void writeUTF(final String utf) {
-      UTF8Util.saveUTF(this, utf);
+      UTF8Util.writeString(buffer, val);
    }
 
    @Override
@@ -576,6 +530,11 @@ public class ChannelBufferWrapper implements ActiveMQBuffer {
    }
 
    @Override
+   public void writeBytes(ByteBuf src, int srcIndex, int length) {
+      buffer.writeBytes(src, srcIndex, length);
+   }
+
+   @Override
    public void writeBytes(final ActiveMQBuffer src, final int srcIndex, final int length) {
       buffer.writeBytes(src.byteBuf(), srcIndex, length);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/Persister.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/Persister.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/Persister.java
new file mode 100644
index 0000000..fd68a77
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/Persister.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.persistence;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+
+public interface Persister<T extends Object> {
+
+   int getEncodeSize(T record);
+
+   void encode(ActiveMQBuffer buffer, T record);
+
+   T decode(ActiveMQBuffer buffer, T record);
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
index bee8790..e70891d 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
@@ -101,6 +101,14 @@ public class ByteUtil {
    }
 
    public static String bytesToHex(byte[] bytes, int groupSize) {
+      if (bytes == null) {
+         return "NULL";
+      }
+
+      if (bytes.length == 0) {
+         return "[]";
+      }
+
       char[] hexChars = new char[bytes.length * 2 + numberOfGroups(bytes, groupSize)];
       int outPos = 0;
       for (int j = 0; j < bytes.length; j++) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/TypedProperties.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/TypedProperties.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/TypedProperties.java
index 56cec48..a421484 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/TypedProperties.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/TypedProperties.java
@@ -24,7 +24,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.logs.ActiveMQUtilBundle;
@@ -47,21 +47,23 @@ import static org.apache.activemq.artemis.utils.DataConstants.STRING;
  * This implementation follows section 3.5.4 of the <i>Java Message Service</i> specification
  * (Version 1.1 April 12, 2002).
  * <p>
- * TODO - should have typed property getters and do conversions herein
  */
 public final class TypedProperties {
 
-   private static final SimpleString AMQ_PROPNAME = new SimpleString("_AMQ_");
-
    private Map<SimpleString, PropertyValue> properties;
 
    private volatile int size;
 
-   private boolean internalProperties;
-
    public TypedProperties() {
    }
 
+   /**
+    *  Return the number of properites
+    * */
+   public int size() {
+      return properties.size();
+   }
+
    public int getMemoryOffset() {
       // The estimate is basically the encode size + 2 object references for each entry in the map
       // Note we don't include the attributes or anything else since they already included in the memory estimate
@@ -75,10 +77,6 @@ public final class TypedProperties {
       size = other.size;
    }
 
-   public boolean hasInternalProperties() {
-      return internalProperties;
-   }
-
    public void putBooleanProperty(final SimpleString key, final boolean value) {
       checkCreateProperties();
       doPutValue(key, new BooleanValue(value));
@@ -321,7 +319,7 @@ public final class TypedProperties {
       }
    }
 
-   public synchronized void decode(final ActiveMQBuffer buffer) {
+   public synchronized void decode(final ByteBuf buffer) {
       byte b = buffer.readByte();
 
       if (b == DataConstants.NULL) {
@@ -406,7 +404,7 @@ public final class TypedProperties {
       }
    }
 
-   public synchronized void encode(final ActiveMQBuffer buffer) {
+   public synchronized void encode(final ByteBuf buffer) {
       if (properties == null) {
          buffer.writeByte(DataConstants.NULL);
       } else {
@@ -499,10 +497,6 @@ public final class TypedProperties {
    }
 
    private synchronized void doPutValue(final SimpleString key, final PropertyValue value) {
-      if (key.startsWith(AMQ_PROPNAME)) {
-         internalProperties = true;
-      }
-
       PropertyValue oldValue = properties.put(key, value);
       if (oldValue != null) {
          size += value.encodeSize() - oldValue.encodeSize();
@@ -547,7 +541,7 @@ public final class TypedProperties {
 
       abstract Object getValue();
 
-      abstract void write(ActiveMQBuffer buffer);
+      abstract void write(ByteBuf buffer);
 
       abstract int encodeSize();
 
@@ -568,7 +562,7 @@ public final class TypedProperties {
       }
 
       @Override
-      public void write(final ActiveMQBuffer buffer) {
+      public void write(final ByteBuf buffer) {
          buffer.writeByte(DataConstants.NULL);
       }
 
@@ -587,7 +581,7 @@ public final class TypedProperties {
          this.val = val;
       }
 
-      private BooleanValue(final ActiveMQBuffer buffer) {
+      private BooleanValue(final ByteBuf buffer) {
          val = buffer.readBoolean();
       }
 
@@ -597,7 +591,7 @@ public final class TypedProperties {
       }
 
       @Override
-      public void write(final ActiveMQBuffer buffer) {
+      public void write(final ByteBuf buffer) {
          buffer.writeByte(DataConstants.BOOLEAN);
          buffer.writeBoolean(val);
       }
@@ -617,7 +611,7 @@ public final class TypedProperties {
          this.val = val;
       }
 
-      private ByteValue(final ActiveMQBuffer buffer) {
+      private ByteValue(final ByteBuf buffer) {
          val = buffer.readByte();
       }
 
@@ -627,7 +621,7 @@ public final class TypedProperties {
       }
 
       @Override
-      public void write(final ActiveMQBuffer buffer) {
+      public void write(final ByteBuf buffer) {
          buffer.writeByte(DataConstants.BYTE);
          buffer.writeByte(val);
       }
@@ -646,7 +640,7 @@ public final class TypedProperties {
          this.val = val;
       }
 
-      private BytesValue(final ActiveMQBuffer buffer) {
+      private BytesValue(final ByteBuf buffer) {
          int len = buffer.readInt();
          val = new byte[len];
          buffer.readBytes(val);
@@ -658,7 +652,7 @@ public final class TypedProperties {
       }
 
       @Override
-      public void write(final ActiveMQBuffer buffer) {
+      public void write(final ByteBuf buffer) {
          buffer.writeByte(DataConstants.BYTES);
          buffer.writeInt(val.length);
          buffer.writeBytes(val);
@@ -679,7 +673,7 @@ public final class TypedProperties {
          this.val = val;
       }
 
-      private ShortValue(final ActiveMQBuffer buffer) {
+      private ShortValue(final ByteBuf buffer) {
          val = buffer.readShort();
       }
 
@@ -689,7 +683,7 @@ public final class TypedProperties {
       }
 
       @Override
-      public void write(final ActiveMQBuffer buffer) {
+      public void write(final ByteBuf buffer) {
          buffer.writeByte(DataConstants.SHORT);
          buffer.writeShort(val);
       }
@@ -708,7 +702,7 @@ public final class TypedProperties {
          this.val = val;
       }
 
-      private IntValue(final ActiveMQBuffer buffer) {
+      private IntValue(final ByteBuf buffer) {
          val = buffer.readInt();
       }
 
@@ -718,7 +712,7 @@ public final class TypedProperties {
       }
 
       @Override
-      public void write(final ActiveMQBuffer buffer) {
+      public void write(final ByteBuf buffer) {
          buffer.writeByte(DataConstants.INT);
          buffer.writeInt(val);
       }
@@ -737,7 +731,7 @@ public final class TypedProperties {
          this.val = val;
       }
 
-      private LongValue(final ActiveMQBuffer buffer) {
+      private LongValue(final ByteBuf buffer) {
          val = buffer.readLong();
       }
 
@@ -747,7 +741,7 @@ public final class TypedProperties {
       }
 
       @Override
-      public void write(final ActiveMQBuffer buffer) {
+      public void write(final ByteBuf buffer) {
          buffer.writeByte(DataConstants.LONG);
          buffer.writeLong(val);
       }
@@ -766,7 +760,7 @@ public final class TypedProperties {
          this.val = val;
       }
 
-      private FloatValue(final ActiveMQBuffer buffer) {
+      private FloatValue(final ByteBuf buffer) {
          val = Float.intBitsToFloat(buffer.readInt());
       }
 
@@ -776,7 +770,7 @@ public final class TypedProperties {
       }
 
       @Override
-      public void write(final ActiveMQBuffer buffer) {
+      public void write(final ByteBuf buffer) {
          buffer.writeByte(DataConstants.FLOAT);
          buffer.writeInt(Float.floatToIntBits(val));
       }
@@ -796,7 +790,7 @@ public final class TypedProperties {
          this.val = val;
       }
 
-      private DoubleValue(final ActiveMQBuffer buffer) {
+      private DoubleValue(final ByteBuf buffer) {
          val = Double.longBitsToDouble(buffer.readLong());
       }
 
@@ -806,7 +800,7 @@ public final class TypedProperties {
       }
 
       @Override
-      public void write(final ActiveMQBuffer buffer) {
+      public void write(final ByteBuf buffer) {
          buffer.writeByte(DataConstants.DOUBLE);
          buffer.writeLong(Double.doubleToLongBits(val));
       }
@@ -825,7 +819,7 @@ public final class TypedProperties {
          this.val = val;
       }
 
-      private CharValue(final ActiveMQBuffer buffer) {
+      private CharValue(final ByteBuf buffer) {
          val = (char) buffer.readShort();
       }
 
@@ -835,7 +829,7 @@ public final class TypedProperties {
       }
 
       @Override
-      public void write(final ActiveMQBuffer buffer) {
+      public void write(final ByteBuf buffer) {
          buffer.writeByte(DataConstants.CHAR);
          buffer.writeShort((short) val);
       }
@@ -854,8 +848,8 @@ public final class TypedProperties {
          this.val = val;
       }
 
-      private StringValue(final ActiveMQBuffer buffer) {
-         val = buffer.readSimpleString();
+      private StringValue(final ByteBuf buffer) {
+         val = SimpleString.readSimpleString(buffer);
       }
 
       @Override
@@ -864,9 +858,9 @@ public final class TypedProperties {
       }
 
       @Override
-      public void write(final ActiveMQBuffer buffer) {
+      public void write(final ByteBuf buffer) {
          buffer.writeByte(DataConstants.STRING);
-         buffer.writeSimpleString(val);
+         SimpleString.writeSimpleString(buffer, val);
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UTF8Util.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UTF8Util.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UTF8Util.java
index e75395b..84e1557 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UTF8Util.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UTF8Util.java
@@ -18,7 +18,9 @@ package org.apache.activemq.artemis.utils;
 
 import java.lang.ref.SoftReference;
 
+import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.logs.ActiveMQUtilBundle;
 import org.apache.activemq.artemis.logs.ActiveMQUtilLogger;
 
@@ -29,15 +31,43 @@ import org.apache.activemq.artemis.logs.ActiveMQUtilLogger;
  */
 public final class UTF8Util {
 
+
+   private static final boolean isTrace = ActiveMQUtilLogger.LOGGER.isTraceEnabled();
+
+   private static final ThreadLocal<SoftReference<StringUtilBuffer>> currenBuffer = new ThreadLocal<>();
+
    private UTF8Util() {
       // utility class
    }
+   public static void writeNullableString(ByteBuf buffer, final String val) {
+      if (val == null) {
+         buffer.writeByte(DataConstants.NULL);
+      } else {
+         buffer.writeByte(DataConstants.NOT_NULL);
+         writeString(buffer, val);
+      }
+   }
 
-   private static final boolean isTrace = ActiveMQUtilLogger.LOGGER.isTraceEnabled();
+   public static void writeString(final ByteBuf buffer, final String val) {
+      int length = val.length();
 
-   private static final ThreadLocal<SoftReference<StringUtilBuffer>> currenBuffer = new ThreadLocal<>();
+      buffer.writeInt(length);
+
+      if (length < 9) {
+         // If very small it's more performant to store char by char
+         for (int i = 0; i < val.length(); i++) {
+            buffer.writeShort((short) val.charAt(i));
+         }
+      } else if (length < 0xfff) {
+         // Store as UTF - this is quicker than char by char for most strings
+         saveUTF(buffer, val);
+      } else {
+         // Store as SimpleString, since can't store utf > 0xffff in length
+         SimpleString.writeSimpleString(buffer, new SimpleString(val));
+      }
+   }
 
-   public static void saveUTF(final ActiveMQBuffer out, final String str) {
+   public static void saveUTF(final ByteBuf out, final String str) {
       StringUtilBuffer buffer = UTF8Util.getThreadLocalBuffer();
 
       if (str.length() > 0xffff) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java
index 8013e96..cb6c8fe 100644
--- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java
@@ -187,12 +187,12 @@ public class TypedPropertiesTest {
       props.putSimpleStringProperty(keyToRemove, RandomUtil.randomSimpleString());
 
       ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(1024);
-      props.encode(buffer);
+      props.encode(buffer.byteBuf());
 
       Assert.assertEquals(props.getEncodeSize(), buffer.writerIndex());
 
       TypedProperties decodedProps = new TypedProperties();
-      decodedProps.decode(buffer);
+      decodedProps.decode(buffer.byteBuf());
 
       TypedPropertiesTest.assertEqualsTypeProperties(props, decodedProps);
 
@@ -200,7 +200,7 @@ public class TypedPropertiesTest {
 
       // After removing a property, you should still be able to encode the Property
       props.removeProperty(keyToRemove);
-      props.encode(buffer);
+      props.encode(buffer.byteBuf());
 
       Assert.assertEquals(props.getEncodeSize(), buffer.writerIndex());
    }
@@ -210,12 +210,12 @@ public class TypedPropertiesTest {
       TypedProperties emptyProps = new TypedProperties();
 
       ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(1024);
-      emptyProps.encode(buffer);
+      emptyProps.encode(buffer.byteBuf());
 
       Assert.assertEquals(props.getEncodeSize(), buffer.writerIndex());
 
       TypedProperties decodedProps = new TypedProperties();
-      decodedProps.decode(buffer);
+      decodedProps.decode(buffer.byteBuf());
 
       TypedPropertiesTest.assertEqualsTypeProperties(emptyProps, decodedProps);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index 38ec105..c0d9db6 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -262,12 +262,6 @@ public final class ActiveMQDefaultConfiguration {
    // The minimal number of data files before we can start compacting
    private static int DEFAULT_JOURNAL_COMPACT_MIN_FILES = 10;
 
-   // XXX Only meant to be used by project developers
-   private static int DEFAULT_JOURNAL_PERF_BLAST_PAGES = -1;
-
-   // XXX Only meant to be used by project developers
-   private static boolean DEFAULT_RUN_SYNC_SPEED_TEST = false;
-
    // Interval to log server specific information (e.g. memory usage etc)
    private static long DEFAULT_SERVER_DUMP_INTERVAL = -1;
 
@@ -801,20 +795,6 @@ public final class ActiveMQDefaultConfiguration {
    }
 
    /**
-    * XXX Only meant to be used by project developers
-    */
-   public static int getDefaultJournalPerfBlastPages() {
-      return DEFAULT_JOURNAL_PERF_BLAST_PAGES;
-   }
-
-   /**
-    * XXX Only meant to be used by project developers
-    */
-   public static boolean isDefaultRunSyncSpeedTest() {
-      return DEFAULT_RUN_SYNC_SPEED_TEST;
-   }
-
-   /**
     * Interval to log server specific information (e.g. memory usage etc)
     */
    public static long getDefaultServerDumpInterval() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index 80116ed..4a5381c 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -16,10 +16,14 @@
  */
 package org.apache.activemq.artemis.api.core;
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.activemq.artemis.utils.UUID;
+import io.netty.buffer.ByteBuf;
+import org.apache.activemq.artemis.api.core.encode.BodyType;
+import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
+import org.apache.activemq.artemis.core.persistence.Persister;
 
 /**
  * A Message is a routable instance that has a payload.
@@ -48,9 +52,41 @@ import org.apache.activemq.artemis.utils.UUID;
  * <p>
  * If conversion is not allowed (for example calling {@code getFloatProperty} on a property set a
  * {@code boolean}), a {@link ActiveMQPropertyConversionException} will be thrown.
+ *
+ *
+ * User cases that will be covered by Message
+ *
+ * Receiving a buffer:
+ *
+ * Message encode = new CoreMessage(); // or any other implementation
+ * encode.receiveBuffer(buffer);
+ *
+ *
+ * Sending to a buffer:
+ *
+ * Message encode;
+ * size = encode.getEncodeSize();
+ * encode.encodeDirectly(bufferOutput);
+ *
+ *
+ * Disabling temporary buffer:
+ *
+ * // This will make the message to only be encoded directly to the output stream, useful on client core API
+ * encode.disableInternalBuffer();
+
  */
 public interface Message {
 
+
+   SimpleString HDR_ROUTE_TO_IDS = new SimpleString("_AMQ_ROUTE_TO");
+
+   SimpleString HDR_SCALEDOWN_TO_IDS = new SimpleString("_AMQ_SCALEDOWN_TO");
+
+   SimpleString HDR_ROUTE_TO_ACK_IDS = new SimpleString("_AMQ_ACK_ROUTE_TO");
+
+   // used by the bridges to set duplicates
+   SimpleString HDR_BRIDGE_DUPLICATE_ID = new SimpleString("_AMQ_BRIDGE_DUP");
+
    /**
     * the actual time the message was expired.
     * * *
@@ -129,6 +165,60 @@ public interface Message {
 
    byte STREAM_TYPE = 6;
 
+
+   void messageChanged();
+
+   /**
+    * Careful: Unless you are changing the body of the message, prefer getReadOnlyBodyBuffer
+    */
+   ActiveMQBuffer getBodyBuffer();
+
+   ActiveMQBuffer getReadOnlyBodyBuffer();
+
+   /** Used in the cases of large messages */
+   LargeBodyEncoder getBodyEncoder() throws ActiveMQException;
+
+   /** Context can be used by the application server to inject extra control, like a protocol specific on the server.
+    * There is only one per Object, use it wisely!
+    *
+    * Note: the intent of this was to replace PageStore reference on Message, but it will be later increased by adidn a ServerPojo
+    * */
+   RefCountMessageListener getContext();
+
+   Message setContext(RefCountMessageListener context);
+
+   /** The buffer will belong to this message, until release is called. */
+   Message setBuffer(ByteBuf buffer);
+
+   // TODO-now: Do we need this?
+   byte getType();
+
+   // TODO-now: Do we need this?
+   Message setType(byte type);
+
+   /**
+    * Returns whether this message is a <em>large message</em> or a regular message.
+    */
+   boolean isLargeMessage();
+
+   /**
+    * TODO: There's currently some treatment on LargeMessage that is done for server's side large message
+    *       This needs to be refactored, this Method shouldn't be used at all.
+    * @Deprecated do not use this, internal use only. *It will* be removed for sure even on minor releases.
+    * */
+   @Deprecated
+   default boolean isServerMessage() {
+      return false;
+   }
+
+   ByteBuf getBuffer();
+
+   /** It will generate a new instance of the message encode, being a deep copy, new properties, new everything */
+   Message copy();
+
+   /** It will generate a new instance of the message encode, being a deep copy, new properties, new everything */
+   Message copy(long newID);
+
    /**
     * Returns the messageID.
     * <br>
@@ -136,39 +226,43 @@ public interface Message {
     */
    long getMessageID();
 
+   Message setMessageID(long id);
+
    /**
-    * Returns the userID - this is an optional user specified UUID that can be set to identify the message
-    * and will be passed around with the message
-    *
-    * @return the user id
+    * Returns the expiration time of this message.
     */
-   UUID getUserID();
+   long getExpiration();
 
    /**
-    * Sets the user ID
+    * Sets the expiration of this message.
     *
-    * @param userID
+    * @param expiration expiration time
     */
-   Message setUserID(UUID userID);
+   Message setExpiration(long expiration);
 
    /**
-    * Returns the address this message is sent to.
+    * Returns whether this message is expired or not.
     */
-   SimpleString getAddress();
+   default boolean isExpired() {
+      if (getExpiration() == 0) {
+         return false;
+      }
+
+      return System.currentTimeMillis() - getExpiration() >= 0;
+   }
+
 
    /**
-    * Sets the address to send this message to.
+    * Returns the userID - this is an optional user specified UUID that can be set to identify the message
+    * and will be passed around with the message
     *
-    * @param address address to send the message to
+    * @return the user id
     */
-   Message setAddress(SimpleString address);
+   Object getUserID();
 
-   /**
-    * Returns this message type.
-    * <p>
-    * See fields {@literal *_TYPE} for possible values.
-    */
-   byte getType();
+   Message setUserID(Object userID);
+
+   void copyHeadersAndProperties(final Message msg);
 
    /**
     * Returns whether this message is durable or not.
@@ -182,36 +276,28 @@ public interface Message {
     */
    Message setDurable(boolean durable);
 
-   /**
-    * Returns the expiration time of this message.
-    */
-   long getExpiration();
+   Persister<Message> getPersister();
 
-   /**
-    * Returns whether this message is expired or not.
-    */
-   boolean isExpired();
+   Object getProtocol();
 
-   /**
-    * Sets the expiration of this message.
-    *
-    * @param expiration expiration time
-    */
-   Message setExpiration(long expiration);
+   Message setProtocol(Object protocol);
+
+   Object getBody();
+
+   BodyType getBodyType();
+
+   Message setBody(BodyType type, Object body);
+
+   String getAddress();
+
+   Message setAddress(String address);
+
+   SimpleString getAddressSimpleString();
+
+   Message setAddress(SimpleString address);
 
-   /**
-    * Returns the message timestamp.
-    * <br>
-    * The timestamp corresponds to the time this message
-    * was handled by an ActiveMQ Artemis server.
-    */
    long getTimestamp();
 
-   /**
-    * Sets the message timestamp.
-    *
-    * @param timestamp timestamp
-    */
    Message setTimestamp(long timestamp);
 
    /**
@@ -230,164 +316,128 @@ public interface Message {
     */
    Message setPriority(byte priority);
 
-   /**
-    * Returns the size of the <em>encoded</em> message.
-    */
-   int getEncodeSize();
+   /** Used to receive this message from an encoded medium buffer */
+   void receiveBuffer(ByteBuf buffer);
 
-   /**
-    * Returns whether this message is a <em>large message</em> or a regular message.
-    */
-   boolean isLargeMessage();
+   /** Used to send this message to an encoded medium buffer.
+    * @param buffer the buffer used.
+    * @param deliveryCount Some protocols (AMQP) will have this as part of the message. */
+   void sendBuffer(ByteBuf buffer, int deliveryCount);
 
-   /**
-    * Returns the message body as an ActiveMQBuffer
-    */
-   ActiveMQBuffer getBodyBuffer();
+   int getPersistSize();
 
-   /**
-    * Writes the input byte array to the message body ActiveMQBuffer
-    */
-   Message writeBodyBufferBytes(byte[] bytes);
+   void persist(ActiveMQBuffer targetRecord);
 
-   /**
-    * Writes the input String to the message body ActiveMQBuffer
-    */
-   Message writeBodyBufferString(String string);
+   void reloadPersistence(ActiveMQBuffer record);
 
-   /**
-    * Returns a <em>copy</em> of the message body as an ActiveMQBuffer. Any modification
-    * of this buffer should not impact the underlying buffer.
-    */
-   ActiveMQBuffer getBodyBufferDuplicate();
+   default void releaseBuffer() {
+      ByteBuf buffer = getBuffer();
+      if (buffer != null) {
+         buffer.release();
+      }
+      setBuffer(null);
+   }
 
-   // Properties
-   // -----------------------------------------------------------------
+   default String getText() {
+      if (getBodyType() == BodyType.Text) {
+         return getBody().toString();
+      } else {
+         return null;
+      }
+   }
 
-   /**
-    * Puts a boolean property in this message.
-    *
-    * @param key   property name
-    * @param value property value
-    */
-   Message putBooleanProperty(SimpleString key, boolean value);
+   // TODO-now: move this to some utility class
+   default void referenceOriginalMessage(final Message original, String originalQueue) {
+      String queueOnMessage = original.getStringProperty(Message.HDR_ORIGINAL_QUEUE.toString());
 
-   /**
-    * @see #putBooleanProperty(SimpleString, boolean)
-    */
-   Message putBooleanProperty(String key, boolean value);
+      if (queueOnMessage != null) {
+         putStringProperty(Message.HDR_ORIGINAL_QUEUE.toString(), queueOnMessage);
+      } else if (originalQueue != null) {
+         putStringProperty(Message.HDR_ORIGINAL_QUEUE.toString(), originalQueue);
+      }
 
-   /**
-    * Puts a byte property in this message.
-    *
-    * @param key   property name
-    * @param value property value
-    */
-   Message putByteProperty(SimpleString key, byte value);
+      if (original.containsProperty(Message.HDR_ORIG_MESSAGE_ID.toString())) {
+         putStringProperty(Message.HDR_ORIGINAL_ADDRESS.toString(), original.getStringProperty(Message.HDR_ORIGINAL_ADDRESS.toString()));
 
-   /**
-    * @see #putByteProperty(SimpleString, byte)
-    */
-   Message putByteProperty(String key, byte value);
+         putLongProperty(Message.HDR_ORIG_MESSAGE_ID.toString(), original.getLongProperty(Message.HDR_ORIG_MESSAGE_ID.toString()));
+      } else {
+         putStringProperty(Message.HDR_ORIGINAL_ADDRESS.toString(), original.getAddress());
 
-   /**
-    * Puts a byte[] property in this message.
-    *
-    * @param key   property name
-    * @param value property value
-    */
-   Message putBytesProperty(SimpleString key, byte[] value);
+         putLongProperty(Message.HDR_ORIG_MESSAGE_ID.toString(), original.getMessageID());
+      }
 
-   /**
-    * @see #putBytesProperty(SimpleString, byte[])
-    */
-   Message putBytesProperty(String key, byte[] value);
+      // reset expiry
+      setExpiration(0);
+   }
 
    /**
-    * Puts a short property in this message.
-    *
-    * @param key   property name
-    * @param value property value
+    * it will translate a property named HDR_DUPLICATE_DETECTION_ID.
+    * TODO-NOW: this can probably be replaced by an utility.
+    * @return
     */
-   Message putShortProperty(SimpleString key, short value);
+   default byte[] getDuplicateIDBytes() {
+      Object duplicateID = getDuplicateProperty();
 
-   /**
-    * @see #putShortProperty(SimpleString, short)
-    */
-   Message putShortProperty(String key, short value);
+      if (duplicateID == null) {
+         return null;
+      } else {
+         if (duplicateID instanceof SimpleString) {
+            return ((SimpleString) duplicateID).getData();
+         } else if (duplicateID instanceof String) {
+            return new SimpleString(duplicateID.toString()).getData();
+         } else {
+            return (byte[]) duplicateID;
+         }
+      }
+   }
 
    /**
-    * Puts a char property in this message.
-    *
-    * @param key   property name
-    * @param value property value
+    * it will translate a property named HDR_DUPLICATE_DETECTION_ID.
+    * TODO-NOW: this can probably be replaced by an utility.
+    * @return
     */
-   Message putCharProperty(SimpleString key, char value);
+   default Object getDuplicateProperty() {
+      return getObjectProperty(Message.HDR_DUPLICATE_DETECTION_ID.toString());
+   }
 
-   /**
-    * @see #putCharProperty(SimpleString, char)
-    */
-   Message putCharProperty(String key, char value);
 
-   /**
-    * Puts an int property in this message.
-    *
-    * @param key   property name
-    * @param value property value
-    */
-   Message putIntProperty(SimpleString key, int value);
+   Message putBooleanProperty(String key, boolean value);
 
-   /**
-    * @see #putIntProperty(SimpleString, int)
-    */
-   Message putIntProperty(String key, int value);
+   Message putByteProperty(String key, byte value);
 
-   /**
-    * Puts a long property in this message.
-    *
-    * @param key   property name
-    * @param value property value
-    */
-   Message putLongProperty(SimpleString key, long value);
+   Message putBytesProperty(String key, byte[] value);
 
-   /**
-    * @see #putLongProperty(SimpleString, long)
-    */
-   Message putLongProperty(String key, long value);
+   Message putShortProperty(String key, short value);
 
-   /**
-    * Puts a float property in this message.
-    *
-    * @param key   property name
-    * @param value property value
-    */
-   Message putFloatProperty(SimpleString key, float value);
+   Message putCharProperty(String key, char value);
 
-   /**
-    * @see #putFloatProperty(SimpleString, float)
-    */
-   Message putFloatProperty(String key, float value);
+   Message putIntProperty(String key, int value);
 
-   /**
-    * Puts a double property in this message.
-    *
-    * @param key   property name
-    * @param value property value
-    */
-   Message putDoubleProperty(SimpleString key, double value);
+   Message putLongProperty(String key, long value);
+
+   Message putFloatProperty(String key, float value);
 
-   /**
-    * @see #putDoubleProperty(SimpleString, double)
-    */
    Message putDoubleProperty(String key, double value);
 
-   /**
-    * Puts a SimpleString property in this message.
-    *
-    * @param key   property name
-    * @param value property value
-    */
-   Message putStringProperty(SimpleString key, SimpleString value);
+
+
+   Message putBooleanProperty(SimpleString key, boolean value);
+
+   Message putByteProperty(SimpleString key, byte value);
+
+   Message putBytesProperty(SimpleString key, byte[] value);
+
+   Message putShortProperty(SimpleString key, short value);
+
+   Message putCharProperty(SimpleString key, char value);
+
+   Message putIntProperty(SimpleString key, int value);
+
+   Message putLongProperty(SimpleString key, long value);
+
+   Message putFloatProperty(SimpleString key, float value);
+
+   Message putDoubleProperty(SimpleString key, double value);
 
    /**
     * Puts a String property in this message.
@@ -397,202 +447,125 @@ public interface Message {
     */
    Message putStringProperty(String key, String value);
 
-   /**
-    * Puts an Object property in this message. <br>
-    * Accepted types are:
-    * <ul>
-    * <li>Boolean</li>
-    * <li>Byte</li>
-    * <li>Short</li>
-    * <li>Character</li>
-    * <li>Integer</li>
-    * <li>Long</li>
-    * <li>Float</li>
-    * <li>Double</li>
-    * <li>String</li>
-    * <li>SimpleString</li>
-    * </ul>
-    * Using any other type will throw a PropertyConversionException.
-    *
-    * @param key   property name
-    * @param value property value
-    * @throws ActiveMQPropertyConversionException if the value is not one of the accepted property
-    *                                             types.
-    */
-   Message putObjectProperty(SimpleString key, Object value) throws ActiveMQPropertyConversionException;
-
-   /**
-    * @see #putObjectProperty(SimpleString, Object)
-    */
    Message putObjectProperty(String key, Object value) throws ActiveMQPropertyConversionException;
 
-   /**
-    * Removes the property corresponding to the specified key.
-    *
-    * @param key property name
-    * @return the value corresponding to the specified key or @{code null}
-    */
-   Object removeProperty(SimpleString key);
+   Message putObjectProperty(SimpleString key, Object value) throws ActiveMQPropertyConversionException;
 
-   /**
-    * @see #removeProperty(SimpleString)
-    */
    Object removeProperty(String key);
 
-   /**
-    * Returns {@code true} if this message contains a property with the given key, {@code false} else.
-    *
-    * @param key property name
-    */
-   boolean containsProperty(SimpleString key);
-
-   /**
-    * @see #containsProperty(SimpleString)
-    */
    boolean containsProperty(String key);
 
-   /**
-    * Returns the property corresponding to the specified key as a Boolean.
-    *
-    * @throws ActiveMQPropertyConversionException if the value can not be converted to a Boolean
-    */
-   Boolean getBooleanProperty(SimpleString key) throws ActiveMQPropertyConversionException;
-
-   /**
-    * @see #getBooleanProperty(SimpleString)
-    */
    Boolean getBooleanProperty(String key) throws ActiveMQPropertyConversionException;
 
-   /**
-    * Returns the property corresponding to the specified key as a Byte.
-    *
-    * @throws ActiveMQPropertyConversionException if the value can not be converted to a Byte
-    */
-   Byte getByteProperty(SimpleString key) throws ActiveMQPropertyConversionException;
-
-   /**
-    * @see #getByteProperty(SimpleString)
-    */
    Byte getByteProperty(String key) throws ActiveMQPropertyConversionException;
 
-   /**
-    * Returns the property corresponding to the specified key as a Double.
-    *
-    * @throws ActiveMQPropertyConversionException if the value can not be converted to a Double
-    */
-   Double getDoubleProperty(SimpleString key) throws ActiveMQPropertyConversionException;
-
-   /**
-    * @see #getDoubleProperty(SimpleString)
-    */
    Double getDoubleProperty(String key) throws ActiveMQPropertyConversionException;
 
-   /**
-    * Returns the property corresponding to the specified key as an Integer.
-    *
-    * @throws ActiveMQPropertyConversionException if the value can not be converted to an Integer
-    */
-   Integer getIntProperty(SimpleString key) throws ActiveMQPropertyConversionException;
-
-   /**
-    * @see #getIntProperty(SimpleString)
-    */
    Integer getIntProperty(String key) throws ActiveMQPropertyConversionException;
 
-   /**
-    * Returns the property corresponding to the specified key as a Long.
-    *
-    * @throws ActiveMQPropertyConversionException if the value can not be converted to a Long
-    */
-   Long getLongProperty(SimpleString key) throws ActiveMQPropertyConversionException;
-
-   /**
-    * @see #getLongProperty(SimpleString)
-    */
    Long getLongProperty(String key) throws ActiveMQPropertyConversionException;
 
-   /**
-    * Returns the property corresponding to the specified key
-    */
-   Object getObjectProperty(SimpleString key);
-
-   /**
-    * @see #getBooleanProperty(SimpleString)
-    */
    Object getObjectProperty(String key);
 
-   /**
-    * Returns the property corresponding to the specified key as a Short.
-    *
-    * @throws ActiveMQPropertyConversionException if the value can not be converted to a Short
-    */
-   Short getShortProperty(SimpleString key) throws ActiveMQPropertyConversionException;
-
-   /**
-    * @see #getShortProperty(SimpleString)
-    */
    Short getShortProperty(String key) throws ActiveMQPropertyConversionException;
 
-   /**
-    * Returns the property corresponding to the specified key as a Float.
-    *
-    * @throws ActiveMQPropertyConversionException if the value can not be converted to a Float
-    */
-   Float getFloatProperty(SimpleString key) throws ActiveMQPropertyConversionException;
-
-   /**
-    * @see #getFloatProperty(SimpleString)
-    */
    Float getFloatProperty(String key) throws ActiveMQPropertyConversionException;
 
-   /**
-    * Returns the property corresponding to the specified key as a String.
-    *
-    * @throws ActiveMQPropertyConversionException if the value can not be converted to a String
-    */
-   String getStringProperty(SimpleString key) throws ActiveMQPropertyConversionException;
-
-   /**
-    * @see #getStringProperty(SimpleString)
-    */
    String getStringProperty(String key) throws ActiveMQPropertyConversionException;
 
-   /**
-    * Returns the property corresponding to the specified key as a SimpleString.
-    *
-    * @throws ActiveMQPropertyConversionException if the value can not be converted to a SimpleString
-    */
-   SimpleString getSimpleStringProperty(SimpleString key) throws ActiveMQPropertyConversionException;
-
-   /**
-    * @see #getSimpleStringProperty(SimpleString)
-    */
    SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException;
 
-   /**
-    * Returns the property corresponding to the specified key as a byte[].
-    *
-    * @throws ActiveMQPropertyConversionException if the value can not be converted to a byte[]
-    */
+   byte[] getBytesProperty(String key) throws ActiveMQPropertyConversionException;
+
+
+   Object removeProperty(SimpleString key);
+
+   boolean containsProperty(SimpleString key);
+
+   Boolean getBooleanProperty(SimpleString key) throws ActiveMQPropertyConversionException;
+
+   Byte getByteProperty(SimpleString key) throws ActiveMQPropertyConversionException;
+
+   Double getDoubleProperty(SimpleString key) throws ActiveMQPropertyConversionException;
+
+   Integer getIntProperty(SimpleString key) throws ActiveMQPropertyConversionException;
+
+   Long getLongProperty(SimpleString key) throws ActiveMQPropertyConversionException;
+
+   Object getObjectProperty(SimpleString key);
+
+   Short getShortProperty(SimpleString key) throws ActiveMQPropertyConversionException;
+
+   Float getFloatProperty(SimpleString key) throws ActiveMQPropertyConversionException;
+
+   String getStringProperty(SimpleString key) throws ActiveMQPropertyConversionException;
+
+   SimpleString getSimpleStringProperty(SimpleString key) throws ActiveMQPropertyConversionException;
+
    byte[] getBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException;
 
+   Message putStringProperty(SimpleString key, SimpleString value);
+
    /**
-    * @see #getBytesProperty(SimpleString)
+    * Returns the size of the <em>encoded</em> message.
     */
-   byte[] getBytesProperty(String key) throws ActiveMQPropertyConversionException;
+   int getEncodeSize();
 
    /**
     * Returns all the names of the properties for this message.
     */
    Set<SimpleString> getPropertyNames();
 
+
+
+   int getRefCount();
+
+   int incrementRefCount() throws Exception;
+
+   int decrementRefCount() throws Exception;
+
+   int incrementDurableRefCount();
+
+   int decrementDurableRefCount();
+
    /**
     * @return Returns the message in Map form, useful when encoding to JSON
     */
-   Map<String, Object> toMap();
+   default Map<String, Object> toMap() {
+      Map map = toPropertyMap();
+      map.put("messageID", getMessageID());
+      Object userID = getUserID();
+      if (getUserID() != null) {
+         map.put("userID", "ID:" + userID.toString());
+      }
+
+      map.put("address", getAddress());
+      map.put("type", getBodyType().toString());
+      map.put("durable", isDurable());
+      map.put("expiration", getExpiration());
+      map.put("timestamp", getTimestamp());
+      map.put("priority", (int)getPriority());
+
+      return map;
+   }
 
    /**
     * @return Returns the message properties in Map form, useful when encoding to JSON
     */
-   Map<String, Object> toPropertyMap();
+   default Map<String, Object> toPropertyMap() {
+      Map map = new HashMap<>();
+      for (SimpleString name : getPropertyNames()) {
+         map.put(name.toString(), getObjectProperty(name.toString()));
+      }
+      return map;
+   }
+
+
+   /** This should make you convert your message into Core format. */
+   Message toCore();
+
+   int getMemoryEstimate();
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessage.java
new file mode 100644
index 0000000..64dd44d
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessage.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.api.core;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class RefCountMessage implements Message {
+
+   private final AtomicInteger durableRefCount = new AtomicInteger();
+
+   private final AtomicInteger refCount = new AtomicInteger();
+
+   private RefCountMessageListener context;
+
+   @Override
+   public Message setContext(RefCountMessageListener context) {
+      this.context = context;
+      return this;
+   }
+
+   @Override
+   public RefCountMessageListener getContext() {
+      return context;
+   }
+
+   @Override
+   public int getRefCount() {
+      return refCount.get();
+   }
+
+   @Override
+   public int incrementRefCount() throws Exception {
+      int count = refCount.incrementAndGet();
+      if (context != null) {
+         context.nonDurableUp(this, count);
+      }
+      return count;
+   }
+
+   @Override
+   public int incrementDurableRefCount() {
+      int count = durableRefCount.incrementAndGet();
+      if (context != null) {
+         context.durableUp(this, count);
+      }
+      return count;
+   }
+
+   @Override
+   public int decrementDurableRefCount() {
+      int count = durableRefCount.decrementAndGet();
+      if (context != null) {
+         context.durableDown(this, count);
+      }
+      return count;
+   }
+
+   @Override
+   public int decrementRefCount() throws Exception {
+      int count = refCount.decrementAndGet();
+      if (context != null) {
+         context.nonDurableDown(this, count);
+      }
+      return count;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessageListener.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessageListener.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessageListener.java
new file mode 100644
index 0000000..e68dffd
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessageListener.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.api.core;
+
+/** If {@link Message#getContext()} != null and is implementing this interface.
+ *  These methods will be called during refCount operations */
+public interface RefCountMessageListener {
+
+   void durableUp(Message message, int durableCount);
+
+   void durableDown(Message message, int durableCount);
+
+   void nonDurableUp(Message message, int nonDurableCoun);
+
+   void nonDurableDown(Message message, int nonDurableCoun);
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientMessage.java
index e87d365..daded00 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientMessage.java
@@ -234,24 +234,16 @@ public interface ClientMessage extends Message {
     * Overridden from {@link Message} to enable fluent API
     */
    @Override
-   ClientMessage putStringProperty(SimpleString key, SimpleString value);
-
-   /**
-    * Overridden from {@link Message} to enable fluent API
-    */
-   @Override
    ClientMessage putStringProperty(String key, String value);
 
    /**
     * Overridden from {@link Message} to enable fluent API
     */
-   @Override
    ClientMessage writeBodyBufferBytes(byte[] bytes);
 
    /**
     * Overridden from {@link Message} to enable fluent API
     */
-   @Override
    ClientMessage writeBodyBufferString(String string);
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/encode/BodyType.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/encode/BodyType.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/encode/BodyType.java
new file mode 100644
index 0000000..743583b
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/encode/BodyType.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.api.core.encode;
+
+public enum BodyType {
+   Undefined, Bytes, Map, Object, Stream, Text
+}