You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/03/01 17:21:10 UTC

[16/23] activemq-artemis git commit: ARTEMIS-1009 Pure Message Encoding.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/encode/MessageBody.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/encode/MessageBody.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/encode/MessageBody.java
new file mode 100644
index 0000000..f7821b9
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/encode/MessageBody.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.api.core.encode;
+
+import java.nio.ByteBuffer;
+
+public interface MessageBody {
+   Object getBody();
+
+   ByteBuffer getBodyArray();
+
+   BodyType getType();
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java
index 900305f..b5d5474 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java
@@ -20,18 +20,18 @@ import java.nio.ByteBuffer;
 
 import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
 
 /**
  * A ResetLimitWrappedActiveMQBuffer
- * TODO: Move this to commons
  */
 public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper {
 
    private final int limit;
 
-   private MessageInternal message;
+   private Message message;
 
    /**
     * We need to turn of notifications of body changes on reset on the server side when dealing with AMQP conversions,
@@ -39,17 +39,17 @@ public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper
     *
     * @param message
     */
-   public void setMessage(MessageInternal message) {
+   public void setMessage(Message message) {
       this.message = message;
    }
 
-   public ResetLimitWrappedActiveMQBuffer(final int limit, final ActiveMQBuffer buffer, final MessageInternal message) {
+   public ResetLimitWrappedActiveMQBuffer(final int limit, final ActiveMQBuffer buffer, final Message message) {
       // a wrapped inside a wrapper will increase the stack size.
       // we fixed this here due to some profiling testing
       this(limit, unwrap(buffer.byteBuf()).duplicate(), message);
    }
 
-   public ResetLimitWrappedActiveMQBuffer(final int limit, final ByteBuf buffer, final MessageInternal message) {
+   public ResetLimitWrappedActiveMQBuffer(final int limit, final ByteBuf buffer, final Message message) {
       // a wrapped inside a wrapper will increase the stack size.
       // we fixed this here due to some profiling testing
       super(buffer);
@@ -67,7 +67,7 @@ public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper
 
    private void changed() {
       if (message != null) {
-         message.bodyChanged();
+         message.messageChanged();
       }
    }
 
@@ -94,8 +94,6 @@ public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper
 
    @Override
    public void resetReaderIndex() {
-      changed();
-
       buffer.readerIndex(limit);
    }
 
@@ -256,6 +254,14 @@ public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper
       super.writeBytes(src);
    }
 
+
+   @Override
+   public void writeBytes(final ByteBuf src, final int srcIndex, final int length) {
+      changed();
+
+      super.writeBytes(src, srcIndex, length);
+   }
+
    @Override
    public void writeBytes(final ActiveMQBuffer src, final int srcIndex, final int length) {
       changed();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java
index c3cbceb..cbfaf6f 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java
@@ -59,7 +59,7 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C
 
    @Override
    public int getEncodeSize() {
-      if (bodyBuffer != null) {
+      if (writableBuffer != null) {
          return super.getEncodeSize();
       } else {
          return DataConstants.SIZE_INT + DataConstants.SIZE_INT + getHeadersAndPropertiesEncodeSize();
@@ -93,7 +93,7 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C
          throw new RuntimeException(e.getMessage(), e);
       }
 
-      return bodyBuffer;
+      return writableBuffer;
    }
 
    @Override
@@ -108,7 +108,7 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C
 
    @Override
    public void saveToOutputStream(final OutputStream out) throws ActiveMQException {
-      if (bodyBuffer != null) {
+      if (writableBuffer != null) {
          // The body was rebuilt on the client, so we need to behave as a regular message on this case
          super.saveToOutputStream(out);
       } else {
@@ -118,7 +118,7 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C
 
    @Override
    public ClientLargeMessageImpl setOutputStream(final OutputStream out) throws ActiveMQException {
-      if (bodyBuffer != null) {
+      if (writableBuffer != null) {
          super.setOutputStream(out);
       } else {
          largeMessageController.setOutputStream(out);
@@ -129,7 +129,7 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C
 
    @Override
    public boolean waitOutputStreamCompletion(final long timeMilliseconds) throws ActiveMQException {
-      if (bodyBuffer != null) {
+      if (writableBuffer != null) {
          return super.waitOutputStreamCompletion(timeMilliseconds);
       } else {
          return largeMessageController.waitCompletion(timeMilliseconds);
@@ -138,7 +138,7 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C
 
    @Override
    public void discardBody() {
-      if (bodyBuffer != null) {
+      if (writableBuffer != null) {
          super.discardBody();
       } else {
          largeMessageController.discardUnusedPackets();
@@ -146,17 +146,17 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C
    }
 
    private void checkBuffer() throws ActiveMQException {
-      if (bodyBuffer == null) {
+      if (writableBuffer == null) {
 
          long bodySize = this.largeMessageSize + BODY_OFFSET;
          if (bodySize > Integer.MAX_VALUE) {
             bodySize = Integer.MAX_VALUE;
          }
-         createBody((int) bodySize);
+         initBuffer((int) bodySize);
 
-         bodyBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, buffer, this);
+         writableBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, buffer.duplicate(), this);
 
-         largeMessageController.saveBuffer(new ActiveMQOutputStream(bodyBuffer));
+         largeMessageController.saveBuffer(new ActiveMQOutputStream(writableBuffer));
       }
    }
 
@@ -178,7 +178,7 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C
 
    public void retrieveExistingData(ClientMessageInternal clMessage) {
       this.messageID = clMessage.getMessageID();
-      this.address = clMessage.getAddress();
+      this.address = clMessage.getAddressSimpleString();
       this.setUserID(clMessage.getUserID());
       this.setFlowControlSize(clMessage.getFlowControlSize());
       this.setDeliveryCount(clMessage.getDeliveryCount());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
index 7bf8eb7..9472b01 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
@@ -28,14 +28,16 @@ import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
-import org.apache.activemq.artemis.core.message.BodyEncoder;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
+import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.reader.MessageUtil;
+import org.apache.activemq.artemis.utils.TypedProperties;
+import org.apache.activemq.artemis.utils.UUID;
 
 /**
  * A ClientMessageImpl
  */
-public class ClientMessageImpl extends MessageImpl implements ClientMessageInternal {
+public class ClientMessageImpl extends CoreMessage implements ClientMessageInternal {
 
    // added this constant here so that the client package have no dependency on JMS
    public static final SimpleString REPLYTO_HEADER_NAME = MessageUtil.REPLYTO_HEADER_NAME;
@@ -57,6 +59,35 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
    public ClientMessageImpl() {
    }
 
+   protected ClientMessageImpl(ClientMessageImpl other) {
+      super(other);
+   }
+
+   @Override
+   public ClientMessageImpl setDurable(boolean durable) {
+      super.setDurable(durable);
+      return this;
+   }
+
+   @Override
+   public ClientMessageImpl setExpiration(long expiration) {
+      super.setExpiration(expiration);
+      return this;
+   }
+
+   @Override
+   public ClientMessageImpl setPriority(byte priority) {
+      super.setPriority(priority);
+      return this;
+   }
+
+   @Override
+   public ClientMessageImpl setUserID(UUID userID) {
+
+      return this;
+   }
+
+
    /*
     * Construct messages before sending
     */
@@ -66,12 +97,18 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
                             final long timestamp,
                             final byte priority,
                             final int initialMessageBufferSize) {
-      super(type, durable, expiration, timestamp, priority, initialMessageBufferSize);
+      this.setType(type).setExpiration(expiration).setTimestamp(timestamp).setDurable(durable).
+           setPriority(priority).initBuffer(initialMessageBufferSize);
    }
 
    @Override
-   public boolean isServerMessage() {
-      return false;
+   public void setAddressTransient(SimpleString address) {
+      this.address = address;
+   }
+
+   @Override
+   public TypedProperties getProperties() {
+      return this.checkProperties();
    }
 
    @Override
@@ -108,6 +145,11 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
       return this;
    }
 
+
+   @Override
+   public void checkCompletion() throws ActiveMQException {
+   }
+
    @Override
    public int getFlowControlSize() {
       if (flowControlSize < 0) {
@@ -141,7 +183,7 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
 
    @Override
    public String toString() {
-      return getClass().getSimpleName() + "[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + ",userID=" + (getUserID() != null ? getUserID() : "null") + ",properties=" + properties.toString() + "]";
+      return getClass().getSimpleName() + "[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + ",userID=" + (getUserID() != null ? getUserID() : "null") + ",properties=" + getProperties().toString() + "]";
    }
 
    @Override
@@ -189,7 +231,7 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
    }
 
    @Override
-   public BodyEncoder getBodyEncoder() throws ActiveMQException {
+   public LargeBodyEncoder getBodyEncoder() throws ActiveMQException {
       return new DecodingContext();
    }
 
@@ -307,15 +349,17 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
 
    @Override
    public ClientMessageImpl writeBodyBufferBytes(byte[] bytes) {
-      return (ClientMessageImpl) super.writeBodyBufferBytes(bytes);
+      getBodyBuffer().writeBytes(bytes);
+      return this;
    }
 
    @Override
    public ClientMessageImpl writeBodyBufferString(String string) {
-      return (ClientMessageImpl) super.writeBodyBufferString(string);
+      getBodyBuffer().writeString(string);
+      return this;
    }
 
-   private final class DecodingContext implements BodyEncoder {
+   private final class DecodingContext implements LargeBodyEncoder {
 
       private DecodingContext() {
       }
@@ -347,9 +391,15 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
       @Override
       public int encode(final ActiveMQBuffer bufferOut, final int size) {
          byte[] bytes = new byte[size];
-         getWholeBuffer().readBytes(bytes);
+         buffer.readBytes(bytes);
          bufferOut.writeBytes(bytes, 0, size);
          return size;
       }
    }
+
+   @Override
+   public Message copy() {
+      return new ClientMessageImpl(this);
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageInternal.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageInternal.java
index 07d4719..878f799 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageInternal.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageInternal.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.artemis.core.client.impl;
 
+import java.io.InputStream;
+
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.utils.TypedProperties;
@@ -44,4 +46,7 @@ public interface ClientMessageInternal extends ClientMessage {
    void discardBody();
 
    boolean isCompressed();
+
+   InputStream getBodyInputStream();
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
index 1dfbe72..ce16011 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
@@ -27,8 +27,8 @@ import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
 import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
-import org.apache.activemq.artemis.core.message.BodyEncoder;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
 import org.apache.activemq.artemis.utils.ActiveMQBufferInputStream;
 import org.apache.activemq.artemis.utils.DeflaterReader;
@@ -217,7 +217,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
       session.startCall();
 
       try {
-         MessageInternal msgI = (MessageInternal) msg;
+         Message msgI = msg;
 
          ClientProducerCredits theCredits;
 
@@ -225,8 +225,8 @@ public class ClientProducerImpl implements ClientProducerInternal {
          // a note about the second check on the writerIndexSize,
          // If it's a server's message, it means this is being done through the bridge or some special consumer on the
          // server's on which case we can't' convert the message into large at the servers
-         if (sessionContext.supportsLargeMessage() && (msgI.getBodyInputStream() != null || msgI.isLargeMessage() ||
-            msgI.getBodyBuffer().writerIndex() > minLargeMessageSize && !msgI.isServerMessage())) {
+         if (sessionContext.supportsLargeMessage() && (getBodyInputStream(msgI) != null || msgI.isLargeMessage() ||
+            msgI.getBodyBuffer().writerIndex() > minLargeMessageSize)) {
             isLarge = true;
          } else {
             isLarge = false;
@@ -258,7 +258,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
          session.workDone();
 
          if (isLarge) {
-            largeMessageSend(sendBlocking, msgI, theCredits, handler);
+            largeMessageSend(sendBlocking, (CoreMessage)msgI, theCredits, handler);
          } else {
             sendRegularMessage(sendingAddress, msgI, sendBlocking, theCredits, handler);
          }
@@ -267,8 +267,12 @@ public class ClientProducerImpl implements ClientProducerInternal {
       }
    }
 
+   private InputStream getBodyInputStream(Message msgI) {
+      return ((ClientMessageInternal)msgI).getBodyInputStream();
+   }
+
    private void sendRegularMessage(final SimpleString sendingAddress,
-                                   final MessageInternal msgI,
+                                   final Message msgI,
                                    final boolean sendBlocking,
                                    final ClientProducerCredits theCredits,
                                    final SendAcknowledgementHandler handler) throws ActiveMQException {
@@ -301,7 +305,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
     * @throws ActiveMQException
     */
    private void largeMessageSend(final boolean sendBlocking,
-                                 final MessageInternal msgI,
+                                 final CoreMessage msgI,
                                  final ClientProducerCredits credits,
                                  SendAcknowledgementHandler handler) throws ActiveMQException {
       logger.tracef("largeMessageSend::%s, Blocking=%s", msgI, sendBlocking);
@@ -313,22 +317,22 @@ public class ClientProducerImpl implements ClientProducerInternal {
       }
 
       // msg.getBody() could be Null on LargeServerMessage
-      if (msgI.getBodyInputStream() == null && msgI.getWholeBuffer() != null) {
-         msgI.getWholeBuffer().readerIndex(0);
+      if (getBodyInputStream(msgI) == null && msgI.getBuffer() != null) {
+         msgI.getBuffer().readerIndex(0);
       }
 
       InputStream input;
 
       if (msgI.isServerMessage()) {
          largeMessageSendServer(sendBlocking, msgI, credits, handler);
-      } else if ((input = msgI.getBodyInputStream()) != null) {
+      } else if ((input = getBodyInputStream(msgI)) != null) {
          largeMessageSendStreamed(sendBlocking, msgI, input, credits, handler);
       } else {
          largeMessageSendBuffered(sendBlocking, msgI, credits, handler);
       }
    }
 
-   private void sendInitialLargeMessageHeader(MessageInternal msgI,
+   private void sendInitialLargeMessageHeader(Message msgI,
                                               ClientProducerCredits credits) throws ActiveMQException {
       int creditsUsed = sessionContext.sendInitialChunkOnLargeMessage(msgI);
 
@@ -348,17 +352,14 @@ public class ClientProducerImpl implements ClientProducerInternal {
     * @throws ActiveMQException
     */
    private void largeMessageSendServer(final boolean sendBlocking,
-                                       final MessageInternal msgI,
+                                       final Message msgI,
                                        final ClientProducerCredits credits,
                                        SendAcknowledgementHandler handler) throws ActiveMQException {
       sendInitialLargeMessageHeader(msgI, credits);
 
-      BodyEncoder context = msgI.getBodyEncoder();
+      LargeBodyEncoder context = msgI.getBodyEncoder();
 
       final long bodySize = context.getLargeBodySize();
-
-      final int reconnectID = sessionContext.getReconnectID();
-
       context.open();
       try {
 
@@ -392,7 +393,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
     * @throws ActiveMQException
     */
    private void largeMessageSendBuffered(final boolean sendBlocking,
-                                         final MessageInternal msgI,
+                                         final Message msgI,
                                          final ClientProducerCredits credits,
                                          SendAcknowledgementHandler handler) throws ActiveMQException {
       msgI.getBodyBuffer().readerIndex(0);
@@ -407,7 +408,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
     * @throws ActiveMQException
     */
    private void largeMessageSendStreamed(final boolean sendBlocking,
-                                         final MessageInternal msgI,
+                                         final Message msgI,
                                          final InputStream inputStreamParameter,
                                          final ClientProducerCredits credits,
                                          SendAcknowledgementHandler handler) throws ActiveMQException {
@@ -478,7 +479,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
                msgI.putLongProperty(Message.HDR_LARGE_BODY_SIZE, deflaterReader.getTotalSize());
 
                msgI.getBodyBuffer().writeBytes(buff, 0, pos);
-               sendRegularMessage(msgI.getAddress(), msgI, sendBlocking, credits, handler);
+               sendRegularMessage(msgI.getAddressSimpleString(), msgI, sendBlocking, credits, handler);
                return;
             } else {
                if (!headerSent) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
index 55f9129..ce652d2 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
@@ -513,6 +513,12 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll
    }
 
    @Override
+   public void writeBytes(ByteBuf src, int srcIndex, int length) {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+
+   @Override
    public ByteBuffer toByteBuffer() {
       throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
index 951aea2..0bb5690 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
@@ -863,6 +863,21 @@ public class LargeMessageControllerImpl implements LargeMessageController {
       throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
    }
 
+   /**
+    * Transfers the specified source buffer's data to this buffer starting at
+    * the current {@code writerIndex} until the source buffer's position
+    * reaches its limit, and increases the {@code writerIndex} by the
+    * number of the transferred bytes.
+    *
+    * @param src The source buffer
+    * @throws IndexOutOfBoundsException if {@code src.remaining()} is greater than
+    *                                   {@code this.writableBytes}
+    */
+   @Override
+   public void writeBytes(ByteBuf src, int srcIndex, int length) {
+      throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
+   }
+
    public int writeBytes(final InputStream in, final int length) throws IOException {
       throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/BodyEncoder.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/BodyEncoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/BodyEncoder.java
deleted file mode 100644
index baafaac..0000000
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/BodyEncoder.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.message;
-
-import java.nio.ByteBuffer;
-
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-
-/**
- * Class used to encode message body into buffers.
- * <br>
- * Used to send large streams over the wire
- */
-public interface BodyEncoder {
-
-   /**
-    * This method must not be called directly by ActiveMQ Artemis clients.
-    */
-   void open() throws ActiveMQException;
-
-   /**
-    * This method must not be called directly by ActiveMQ Artemis clients.
-    */
-   void close() throws ActiveMQException;
-
-   /**
-    * This method must not be called directly by ActiveMQ Artemis clients.
-    */
-   int encode(ByteBuffer bufferRead) throws ActiveMQException;
-
-   /**
-    * This method must not be called directly by ActiveMQ Artemis clients.
-    */
-   int encode(ActiveMQBuffer bufferOut, int size) throws ActiveMQException;
-
-   /**
-    * This method must not be called directly by ActiveMQ Artemis clients.
-    */
-   long getLargeBodySize();
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyEncoder.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyEncoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyEncoder.java
new file mode 100644
index 0000000..8b96282
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyEncoder.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.message;
+
+import java.nio.ByteBuffer;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+
+/**
+ * Class used to encode message body into buffers.
+ * <br>
+ * Used to send large streams over the wire
+ */
+public interface LargeBodyEncoder {
+
+   /**
+    * This method must not be called directly by ActiveMQ Artemis clients.
+    */
+   void open() throws ActiveMQException;
+
+   /**
+    * This method must not be called directly by ActiveMQ Artemis clients.
+    */
+   void close() throws ActiveMQException;
+
+   /**
+    * This method must not be called directly by ActiveMQ Artemis clients.
+    */
+   int encode(ByteBuffer bufferRead) throws ActiveMQException;
+
+   /**
+    * This method must not be called directly by ActiveMQ Artemis clients.
+    */
+   int encode(ActiveMQBuffer bufferOut, int size) throws ActiveMQException;
+
+   /**
+    * This method must not be called directly by ActiveMQ Artemis clients.
+    */
+   long getLargeBodySize();
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
new file mode 100644
index 0000000..fd09751
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -0,0 +1,1066 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.message.impl;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RefCountMessage;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.encode.BodyType;
+import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
+import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
+import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.utils.DataConstants;
+import org.apache.activemq.artemis.utils.TypedProperties;
+import org.apache.activemq.artemis.utils.UUID;
+import org.jboss.logging.Logger;
+
+/** Note: you shouldn't change properties using multi-threads. Change your properties before you can send it to multiple
+ *  consumers */
+public class CoreMessage extends RefCountMessage {
+
+   public static final int BUFFER_HEADER_SPACE = PacketImpl.PACKET_HEADERS_SIZE;
+
+   // This is an estimate of how much memory a ServerMessageImpl takes up, exclusing body and properties
+   // Note, it is only an estimate, it's not possible to be entirely sure with Java
+   // This figure is calculated using the test utilities in org.apache.activemq.tests.unit.util.sizeof
+   // The value is somewhat higher on 64 bit architectures, probably due to different alignment
+   private static final int memoryOffset = 352;
+
+   private volatile int memoryEstimate = -1;
+
+
+   private static final Logger logger = Logger.getLogger(CoreMessage.class);
+
+   // There's an integer with the number of bytes for the body
+   public static final int BODY_OFFSET = DataConstants.SIZE_INT;
+
+   /** That is the encode for the whole message, including properties..
+       it does not include the buffer for the Packet send and receive header on core protocol */
+   protected ByteBuf buffer;
+
+   private volatile boolean validBuffer = false;
+
+   protected volatile ResetLimitWrappedActiveMQBuffer writableBuffer;
+
+   Object body;
+
+   protected int endOfBodyPosition = -1;
+
+   protected int messageIDPosition = -1;
+
+   protected long messageID;
+
+   protected SimpleString address;
+
+   protected byte type;
+
+   protected boolean durable;
+
+   /**
+    * GMT milliseconds at which this message expires. 0 means never expires *
+    */
+   private long expiration;
+
+   protected long timestamp;
+
+   protected byte priority;
+
+   private UUID userID;
+
+   private int propertiesLocation = -1;
+
+   protected volatile TypedProperties properties;
+
+   private Object protocol;
+
+   public CoreMessage() {
+   }
+
+   @Override
+   public CoreMessage setProtocol(Object protocol) {
+      this.protocol = protocol;
+      return this;
+   }
+
+   @Override
+   public Object getProtocol() {
+      return protocol;
+   }
+
+   @Override
+   public Persister<Message> getPersister() {
+      return CoreMessagePersister.getInstance();
+   }
+
+   public CoreMessage initBuffer(final int initialMessageBufferSize) {
+      buffer = ActiveMQBuffers.dynamicBuffer(initialMessageBufferSize).byteBuf();
+
+      // There's a bug in netty which means a dynamic buffer won't resize until you write a byte
+      buffer.writeByte((byte) 0);
+
+      buffer.setIndex(BODY_OFFSET, BODY_OFFSET);
+
+      return this;
+   }
+
+   @Override
+   public void receiveBuffer(ByteBuf buffer) {
+      this.buffer = buffer;
+      this.buffer.retain();
+      decode();
+      this.validBuffer = true;
+   }
+
+   @Override
+   public ActiveMQBuffer getReadOnlyBodyBuffer() {
+      internalWritableBuffer();
+      return new ChannelBufferWrapper(buffer.slice(BODY_OFFSET, endOfBodyPosition - BUFFER_HEADER_SPACE).setIndex(0, endOfBodyPosition - BUFFER_HEADER_SPACE).asReadOnly());
+   }
+
+   /**
+    *
+    * @param sendBuffer
+    * @param deliveryCount Some protocols (AMQP) will have this as part of the message. ignored on core
+    */
+   @Override
+   public void sendBuffer(ByteBuf sendBuffer, int deliveryCount) {
+      checkEncode();
+      sendBuffer.writeBytes(buffer, 0, buffer.writerIndex());
+   }
+
+   private synchronized void checkEncode() {
+      if (!validBuffer) {
+         encode();
+      }
+   }
+
+   /**
+    * {@inheritDoc}
+    */
+   @Override
+   public ActiveMQBuffer getBodyBuffer() {
+      // if using the writable buffer, we must parse properties
+      checkProperties();
+
+      internalWritableBuffer();
+
+      return writableBuffer;
+   }
+
+   private void internalWritableBuffer() {
+      if (writableBuffer == null) {
+         writableBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, buffer.duplicate(), this);
+         if (endOfBodyPosition > 0) {
+            writableBuffer.byteBuf().setIndex(BODY_OFFSET, endOfBodyPosition - BUFFER_HEADER_SPACE + BODY_OFFSET);
+            writableBuffer.resetReaderIndex();
+         }
+      }
+   }
+
+   public int getEndOfBodyPosition() {
+      if (endOfBodyPosition < 0) {
+         endOfBodyPosition = getBodyBuffer().writerIndex();
+      }
+      return endOfBodyPosition;
+   }
+
+
+   public TypedProperties getTypedProperties() {
+      return checkProperties();
+   }
+
+
+   @Override
+   public void messageChanged() {
+      validBuffer = false;
+   }
+
+   protected CoreMessage(CoreMessage other) {
+      this(other, other.properties);
+   }
+
+   public CoreMessage(long id, int bufferSize) {
+      this.initBuffer(bufferSize);
+      this.setMessageID(id);
+   }
+
+   protected CoreMessage(CoreMessage other, TypedProperties copyProperties) {
+      this.body = other.body;
+      this.endOfBodyPosition = other.endOfBodyPosition;
+      this.messageID = other.messageID;
+      this.address = other.address;
+      this.type = other.type;
+      this.durable = other.durable;
+      this.expiration = other.expiration;
+      this.timestamp = other.timestamp;
+      this.priority = other.priority;
+      this.userID = other.userID;
+      if (copyProperties != null) {
+         this.properties = new TypedProperties(copyProperties);
+      }
+      if (other.buffer != null) {
+         this.buffer = other.buffer.copy();
+      }
+   }
+
+   @Override
+   public void copyHeadersAndProperties(final Message msg) {
+      messageID = msg.getMessageID();
+      address = msg.getAddressSimpleString();
+      userID = (UUID)msg.getUserID();
+      type = msg.getType();
+      durable = msg.isDurable();
+      expiration = msg.getExpiration();
+      timestamp = msg.getTimestamp();
+      priority = msg.getPriority();
+
+      if (msg instanceof CoreMessage) {
+         properties = ((CoreMessage)msg).getTypedProperties();
+      } else {
+         // TODO-now: copy stuff
+         logger.warn("Must implement copyHeaderAndProperties for other messages");
+      }
+   }
+
+
+   @Override
+   public Message copy() {
+      return new CoreMessage(this);
+   }
+
+   @Override
+   public Message copy(long newID) {
+      return copy().setMessageID(newID);
+   }
+
+   @Override
+   public long getExpiration() {
+      return expiration;
+   }
+
+   @Override
+   public long getTimestamp() {
+      return timestamp;
+   }
+
+   @Override
+   public CoreMessage setTimestamp(long timestamp) {
+      this.timestamp = timestamp;
+      return this;
+   }
+
+   @Override
+   public long getMessageID() {
+      return messageID;
+   }
+
+   @Override
+   public byte getPriority() {
+      return priority;
+   }
+
+   @Override
+   public UUID getUserID() {
+      return userID;
+   }
+
+   @Override
+   public CoreMessage setUserID(Object uuid) {
+      this.userID = (UUID)uuid;
+      return this;
+   }
+
+   @Override
+   public CoreMessage setMessageID(long messageID) {
+      this.messageID = messageID;
+      if (messageIDPosition >= 0 && validBuffer) {
+         buffer.setLong(messageIDPosition, messageID);
+      }
+      return this;
+   }
+
+   @Override
+   public CoreMessage setAddress(SimpleString address) {
+      if (validBuffer && !address.equals(this.address)) {
+         messageChanged();
+      }
+      this.address = address;
+      return this;
+   }
+
+   @Override
+   public SimpleString getAddressSimpleString() {
+      return address;
+   }
+
+
+   @Override
+   public CoreMessage setExpiration(long expiration) {
+      this.expiration = expiration;
+      return this;
+   }
+
+   @Override
+   public CoreMessage setPriority(byte priority) {
+      this.priority = priority;
+      return this;
+   }
+
+   public CoreMessage setUserID(UUID userID) {
+      this.userID = userID;
+      return this;
+   }
+
+   /**
+    * I am keeping this synchronized as the decode of the Properties is lazy
+    */
+   protected TypedProperties checkProperties() {
+      if (properties == null) {
+         TypedProperties properties = new TypedProperties();
+         if (buffer != null && propertiesLocation >= 0) {
+            properties.decode(buffer.duplicate().readerIndex(propertiesLocation));
+         }
+         this.properties = properties;
+      }
+
+      return this.properties;
+   }
+
+
+   @Override
+   public int getMemoryEstimate() {
+      if (memoryEstimate == -1) {
+         if (buffer == null) {
+            new Exception("It is null").printStackTrace();
+         }
+         if (properties == null) {
+            new Exception("Properties It is null").printStackTrace();
+         }
+         memoryEstimate = memoryOffset +
+            (buffer != null ? buffer.capacity() : 0) +
+            (properties != null ? properties.getMemoryOffset() : 0);
+      }
+
+      return memoryEstimate;
+   }
+
+   @Override
+   public byte getType() {
+      return type;
+   }
+
+   @Override
+   public CoreMessage setType(byte type) {
+      this.type = type;
+      return this;
+   }
+
+   private void decode() {
+      endOfBodyPosition = buffer.readInt();
+
+      buffer.skipBytes(endOfBodyPosition - BUFFER_HEADER_SPACE);
+
+      decodeHeadersAndProperties(buffer, true);
+      buffer.readerIndex(0);
+
+      internalWritableBuffer();
+   }
+
+
+   public void decodeHeadersAndProperties(final ByteBuf buffer) {
+      decodeHeadersAndProperties(buffer, false);
+   }
+
+   private void decodeHeadersAndProperties(final ByteBuf buffer, boolean lazyProperties) {
+      messageIDPosition = buffer.readerIndex();
+      messageID = buffer.readLong();
+
+      address = SimpleString.readNullableSimpleString(buffer);
+      if (buffer.readByte() == DataConstants.NOT_NULL) {
+         byte[] bytes = new byte[16];
+         buffer.readBytes(bytes);
+         userID = new UUID(UUID.TYPE_TIME_BASED, bytes);
+      } else {
+         userID = null;
+      }
+      type = buffer.readByte();
+      durable = buffer.readBoolean();
+      expiration = buffer.readLong();
+      timestamp = buffer.readLong();
+      priority = buffer.readByte();
+      if (lazyProperties) {
+         properties = null;
+         propertiesLocation = buffer.readerIndex();
+      } else {
+         properties = new TypedProperties();
+         properties.decode(buffer);
+      }
+   }
+
+
+   public synchronized CoreMessage encode() {
+
+      checkProperties();
+
+      if (writableBuffer != null) {
+         // The message encode takes into consideration the PacketImpl which is not part of this encoding
+         // so we always need to take the BUFFER_HEADER_SPACE from packet impl into consideration
+         endOfBodyPosition = writableBuffer.writerIndex() + BUFFER_HEADER_SPACE - 4;
+      } else if (endOfBodyPosition <= 0) {
+         endOfBodyPosition = BUFFER_HEADER_SPACE;
+      }
+
+      buffer.setIndex(0, 0);
+      buffer.writeInt(endOfBodyPosition);
+
+      // The end of body position
+      buffer.writerIndex(endOfBodyPosition - BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
+
+      encodeHeadersAndProperties(buffer);
+
+      validBuffer = true;
+
+      return this;
+   }
+
+   public void encodeHeadersAndProperties(final ByteBuf buffer) {
+      checkProperties();
+      messageIDPosition = buffer.writerIndex();
+      buffer.writeLong(messageID);
+      SimpleString.writeNullableSimpleString(buffer, address);
+      if (userID == null) {
+         buffer.writeByte(DataConstants.NULL);
+      } else {
+         buffer.writeByte(DataConstants.NOT_NULL);
+         buffer.writeBytes(userID.asBytes());
+      }
+      buffer.writeByte(type);
+      buffer.writeBoolean(durable);
+      buffer.writeLong(expiration);
+      buffer.writeLong(timestamp);
+      buffer.writeByte(priority);
+      properties.encode(buffer);
+   }
+
+   @Override
+   public Object getBody() {
+
+      if (body == null) {
+         decodeBody();
+      }
+
+      return body;
+   }
+
+   private void decodeBody() {
+      buffer.readerIndex(DataConstants.SIZE_INT);
+      switch (getBodyType()) {
+         case Text:
+            body = SimpleString.readNullableSimpleString(buffer);
+            break;
+
+         default:
+            break;
+      }
+   }
+
+   public int getHeadersAndPropertiesEncodeSize() {
+      return DataConstants.SIZE_LONG + // Message ID
+         DataConstants.SIZE_BYTE + // user id null?
+         (userID == null ? 0 : 16) +
+             /* address */SimpleString.sizeofNullableString(address) +
+         DataConstants./* Type */SIZE_BYTE +
+         DataConstants./* Durable */SIZE_BOOLEAN +
+         DataConstants./* Expiration */SIZE_LONG +
+         DataConstants./* Timestamp */SIZE_LONG +
+         DataConstants./* Priority */SIZE_BYTE +
+             /* PropertySize and Properties */checkProperties().getEncodeSize();
+   }
+
+   @Override
+   public BodyType getBodyType() {
+      return getBodyType(type);
+   }
+
+   public static BodyType getBodyType(byte type) {
+      switch (type) {
+
+         case Message.DEFAULT_TYPE:
+            return BodyType.Undefined;
+
+         case Message.OBJECT_TYPE:
+            return BodyType.Object;
+
+         case Message.TEXT_TYPE:
+            return BodyType.Text;
+
+         case Message.BYTES_TYPE:
+            return BodyType.Text;
+
+         case Message.MAP_TYPE:
+            return BodyType.Map;
+
+         case Message.STREAM_TYPE:
+            return BodyType.Stream;
+
+         default:
+            return BodyType.Undefined;
+
+      }
+   }
+
+   @Override
+   public int getEncodeSize() {
+      checkEncode();
+      return buffer == null ? -1 : buffer.writerIndex();
+   }
+
+   @Override
+   public CoreMessage setBody(final BodyType bodyType, Object body) {
+      messageChanged();
+
+      this.type = Message.TEXT_TYPE;
+      this.body = body;
+
+      return this;
+   }
+
+   @Override
+   public boolean isLargeMessage() {
+      return false;
+   }
+
+   private void encodeBody(ByteBuf intoBuffer) {
+      intoBuffer.writerIndex(DataConstants.SIZE_INT);
+
+      switch (getBodyType()) {
+
+         // TODO-now implement other types
+         case Text:
+            SimpleString.writeNullableSimpleString(intoBuffer, SimpleString.toSimpleString(body == null ? null : body.toString()));
+            break;
+
+         default:
+            break;
+      }
+
+
+      endOfBodyPosition = buffer.writerIndex() + BUFFER_HEADER_SPACE;
+      buffer.setInt(0, endOfBodyPosition);
+   }
+
+   @Override
+   public String getAddress() {
+      if (address == null) {
+         return null;
+      } else {
+         return address.toString();
+      }
+   }
+
+   @Override
+   public CoreMessage setAddress(String address) {
+      messageChanged();
+      this.address = SimpleString.toSimpleString(address);
+      return this;
+   }
+
+   @Override
+   public CoreMessage setBuffer(ByteBuf buffer) {
+      this.buffer = buffer;
+
+      return this;
+   }
+
+   @Override
+   public ByteBuf getBuffer() {
+      return buffer;
+   }
+
+   @Override
+   public boolean isDurable() {
+      return durable;
+   }
+
+   @Override
+   public CoreMessage setDurable(boolean durable) {
+      messageChanged();
+      this.durable = durable;
+      return this;
+   }
+
+
+   @Override
+   public CoreMessage putBooleanProperty(final String key, final boolean value) {
+      messageChanged();
+      checkProperties();
+      properties.putBooleanProperty(new SimpleString(key), value);
+      return this;
+   }
+
+   @Override
+   public CoreMessage putBooleanProperty(final SimpleString key, final boolean value) {
+      messageChanged();
+      checkProperties();
+      properties.putBooleanProperty(key, value);
+      return this;
+   }
+
+   @Override
+   public Boolean getBooleanProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
+      checkProperties();
+      return properties.getBooleanProperty(key);
+   }
+
+   @Override
+   public Boolean getBooleanProperty(final String key) throws ActiveMQPropertyConversionException {
+      checkProperties();
+      return properties.getBooleanProperty(new SimpleString(key));
+   }
+
+
+   @Override
+   public CoreMessage putByteProperty(final SimpleString key, final byte value) {
+      messageChanged();
+      checkProperties();
+      properties.putByteProperty(key, value);
+      return this;
+   }
+
+
+   @Override
+   public CoreMessage putByteProperty(final String key, final byte value) {
+      messageChanged();
+      checkProperties();
+      properties.putByteProperty(new SimpleString(key), value);
+
+      return this;
+   }
+
+
+   @Override
+   public Byte getByteProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
+      checkProperties();
+      return properties.getByteProperty(key);
+   }
+
+   @Override
+   public Byte getByteProperty(final String key) throws ActiveMQPropertyConversionException {
+      return getByteProperty(SimpleString.toSimpleString(key));
+   }
+
+   @Override
+   public CoreMessage putBytesProperty(final SimpleString key, final byte[] value) {
+      messageChanged();
+      checkProperties();
+      properties.putBytesProperty(key, value);
+
+      return this;
+   }
+
+   @Override
+   public CoreMessage putBytesProperty(final String key, final byte[] value) {
+      messageChanged();
+      checkProperties();
+      properties.putBytesProperty(new SimpleString(key), value);
+      return this;
+   }
+
+
+   @Override
+   public byte[] getBytesProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
+      checkProperties();
+      return properties.getBytesProperty(key);
+   }
+
+   @Override
+   public byte[] getBytesProperty(final String key) throws ActiveMQPropertyConversionException {
+      return getBytesProperty(new SimpleString(key));
+   }
+
+   @Override
+   public CoreMessage putCharProperty(SimpleString key, char value) {
+      messageChanged();
+      checkProperties();
+      properties.putCharProperty(key, value);
+      return this;
+   }
+
+   @Override
+   public CoreMessage putCharProperty(String key, char value) {
+      messageChanged();
+      checkProperties();
+      properties.putCharProperty(new SimpleString(key), value);
+      return this;
+   }
+
+   @Override
+   public CoreMessage putShortProperty(final SimpleString key, final short value) {
+      messageChanged();
+      checkProperties();
+      properties.putShortProperty(key, value);
+      return this;
+   }
+
+   @Override
+   public CoreMessage putShortProperty(final String key, final short value) {
+      messageChanged();
+      checkProperties();
+      properties.putShortProperty(new SimpleString(key), value);
+      return this;
+   }
+
+
+   @Override
+   public CoreMessage putIntProperty(final SimpleString key, final int value) {
+      messageChanged();
+      checkProperties();
+      properties.putIntProperty(key, value);
+      return this;
+   }
+
+   @Override
+   public CoreMessage putIntProperty(final String key, final int value) {
+      messageChanged();
+      checkProperties();
+      properties.putIntProperty(new SimpleString(key), value);
+      return this;
+   }
+
+   @Override
+   public Integer getIntProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
+      checkProperties();
+      return properties.getIntProperty(key);
+   }
+
+   @Override
+   public Integer getIntProperty(final String key) throws ActiveMQPropertyConversionException {
+      return getIntProperty(SimpleString.toSimpleString(key));
+   }
+
+
+   @Override
+   public CoreMessage putLongProperty(final SimpleString key, final long value) {
+      messageChanged();
+      checkProperties();
+      properties.putLongProperty(key, value);
+      return this;
+   }
+
+   @Override
+   public CoreMessage putLongProperty(final String key, final long value) {
+      messageChanged();
+      checkProperties();
+      properties.putLongProperty(new SimpleString(key), value);
+      return this;
+   }
+
+   @Override
+   public Long getLongProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
+      checkProperties();
+      return properties.getLongProperty(key);
+   }
+
+   @Override
+   public Long getLongProperty(final String key) throws ActiveMQPropertyConversionException {
+      checkProperties();
+      return getLongProperty(SimpleString.toSimpleString(key));
+   }
+
+
+   @Override
+   public CoreMessage putFloatProperty(final SimpleString key, final float value) {
+      messageChanged();
+      checkProperties();
+      properties.putFloatProperty(key, value);
+      return this;
+   }
+
+   @Override
+   public CoreMessage putFloatProperty(final String key, final float value) {
+      messageChanged();
+      checkProperties();
+      properties.putFloatProperty(new SimpleString(key), value);
+      return this;
+   }
+
+   @Override
+   public CoreMessage putDoubleProperty(final SimpleString key, final double value) {
+      messageChanged();
+      checkProperties();
+      properties.putDoubleProperty(key, value);
+      return this;
+   }
+
+   @Override
+   public CoreMessage putDoubleProperty(final String key, final double value) {
+      messageChanged();
+      checkProperties();
+      properties.putDoubleProperty(new SimpleString(key), value);
+      return this;
+   }
+
+
+   @Override
+   public Double getDoubleProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
+      messageChanged();
+      checkProperties();
+      return properties.getDoubleProperty(key);
+   }
+
+   @Override
+   public Double getDoubleProperty(final String key) throws ActiveMQPropertyConversionException {
+      checkProperties();
+      return getDoubleProperty(SimpleString.toSimpleString(key));
+   }
+
+   @Override
+   public CoreMessage putStringProperty(final SimpleString key, final SimpleString value) {
+      messageChanged();
+      checkProperties();
+      properties.putSimpleStringProperty(key, value);
+      return this;
+   }
+
+   @Override
+   public CoreMessage putStringProperty(final String key, final String value) {
+      messageChanged();
+      checkProperties();
+      properties.putSimpleStringProperty(new SimpleString(key), SimpleString.toSimpleString(value));
+      return this;
+   }
+
+   @Override
+   public CoreMessage putObjectProperty(final SimpleString key,
+                                        final Object value) throws ActiveMQPropertyConversionException {
+      messageChanged();
+      checkProperties();
+      TypedProperties.setObjectProperty(key, value, properties);
+      return this;
+   }
+
+   @Override
+   public Object getObjectProperty(final String key) {
+      checkProperties();
+      return getObjectProperty(SimpleString.toSimpleString(key));
+   }
+
+   @Override
+   public Object getObjectProperty(final SimpleString key) {
+      checkProperties();
+      return properties.getProperty(key);
+   }
+
+   @Override
+   public CoreMessage putObjectProperty(final String key, final Object value) throws ActiveMQPropertyConversionException {
+      messageChanged();
+      putObjectProperty(new SimpleString(key), value);
+      return this;
+   }
+
+   @Override
+   public Short getShortProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
+      checkProperties();
+      return properties.getShortProperty(key);
+   }
+
+   @Override
+   public Short getShortProperty(final String key) throws ActiveMQPropertyConversionException {
+      checkProperties();
+      return properties.getShortProperty(new SimpleString(key));
+   }
+
+   @Override
+   public Float getFloatProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
+      checkProperties();
+      return properties.getFloatProperty(key);
+   }
+
+   @Override
+   public Float getFloatProperty(final String key) throws ActiveMQPropertyConversionException {
+      checkProperties();
+      return properties.getFloatProperty(new SimpleString(key));
+   }
+
+   @Override
+   public String getStringProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
+      SimpleString str = getSimpleStringProperty(key);
+
+      if (str == null) {
+         return null;
+      } else {
+         return str.toString();
+      }
+   }
+
+   @Override
+   public String getStringProperty(final String key) throws ActiveMQPropertyConversionException {
+      return getStringProperty(new SimpleString(key));
+   }
+
+   @Override
+   public SimpleString getSimpleStringProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
+      checkProperties();
+      return properties.getSimpleStringProperty(key);
+   }
+
+   @Override
+   public SimpleString getSimpleStringProperty(final String key) throws ActiveMQPropertyConversionException {
+      checkProperties();
+      return properties.getSimpleStringProperty(new SimpleString(key));
+   }
+
+   @Override
+   public Object removeProperty(final SimpleString key) {
+      checkProperties();
+      Object oldValue = properties.removeProperty(key);
+      if (oldValue != null) {
+         messageChanged();
+      }
+      return oldValue;
+   }
+
+   @Override
+   public Object removeProperty(final String key) {
+      messageChanged();
+      checkProperties();
+      Object oldValue = properties.removeProperty(new SimpleString(key));
+      if (oldValue != null) {
+         messageChanged();
+      }
+      return oldValue;
+   }
+
+   @Override
+   public boolean containsProperty(final SimpleString key) {
+      checkProperties();
+      return properties.containsProperty(key);
+   }
+
+   @Override
+   public boolean containsProperty(final String key) {
+      checkProperties();
+      return properties.containsProperty(new SimpleString(key));
+   }
+
+   @Override
+   public Set<SimpleString> getPropertyNames() {
+      checkProperties();
+      return properties.getPropertyNames();
+   }
+
+   @Override
+   public LargeBodyEncoder getBodyEncoder() throws ActiveMQException {
+      return new DecodingContext();
+   }
+
+   private final class DecodingContext implements LargeBodyEncoder {
+
+      private int lastPos = 0;
+
+      private DecodingContext() {
+      }
+
+      @Override
+      public void open() {
+      }
+
+      @Override
+      public void close() {
+      }
+
+      @Override
+      public long getLargeBodySize() {
+         return buffer.writerIndex();
+      }
+
+      @Override
+      public int encode(final ByteBuffer bufferRead) throws ActiveMQException {
+         ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(bufferRead);
+         return encode(buffer, bufferRead.capacity());
+      }
+
+      @Override
+      public int encode(final ActiveMQBuffer bufferOut, final int size) {
+         bufferOut.byteBuf().writeBytes(buffer, lastPos, size);
+         lastPos += size;
+         return size;
+      }
+   }
+
+   @Override
+   public int getPersistSize() {
+      checkEncode();
+      return buffer.writerIndex() + DataConstants.SIZE_INT;
+   }
+
+   @Override
+   public void persist(ActiveMQBuffer targetRecord) {
+      targetRecord.writeInt(buffer.writerIndex());
+      targetRecord.writeBytes(buffer, 0, buffer.writerIndex());
+   }
+
+   @Override
+   public void reloadPersistence(ActiveMQBuffer record) {
+      int size = record.readInt();
+      initBuffer(size);
+      buffer.setIndex(0, 0).writeBytes(record.byteBuf(), size);
+      decode();
+
+   }
+
+   @Override
+   public Message toCore() {
+      return this;
+   }
+
+
+
+   @Override
+   public String toString() {
+      try {
+         return "CoreMessage[messageID=" + messageID + ",durable=" + isDurable() + ",userID=" + getUserID() + ",priority=" + this.getPriority()  +
+            ", timestamp=" + toDate(getTimestamp()) + ",expiration=" + toDate(getExpiration()) +
+            ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]@" + System.identityHashCode(this);
+      } catch (Throwable e) {
+         return "ServerMessage[messageID=" + messageID + "]";
+      }
+   }
+
+
+   private static String toDate(long timestamp) {
+      if (timestamp == 0) {
+         return "0";
+      } else {
+         return new java.util.Date(timestamp).toString();
+      }
+
+   }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessagePersister.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessagePersister.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessagePersister.java
new file mode 100644
index 0000000..ddf39d2
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessagePersister.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.message.impl;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.apache.activemq.artemis.utils.DataConstants;
+
+public class CoreMessagePersister implements Persister<Message> {
+
+   public static CoreMessagePersister theInstance = new CoreMessagePersister();
+
+   public static CoreMessagePersister getInstance() {
+      return theInstance;
+   }
+
+   protected CoreMessagePersister() {
+   }
+
+
+   @Override
+   public int getEncodeSize(Message record) {
+      return DataConstants.SIZE_BYTE + record.getPersistSize() +
+         SimpleString.sizeofNullableString(record.getAddressSimpleString()) + DataConstants.SIZE_LONG;
+   }
+
+
+   /** Sub classes must add the first short as the protocol-id */
+   @Override
+   public void encode(ActiveMQBuffer buffer, Message record) {
+      buffer.writeByte((byte)1);
+      buffer.writeLong(record.getMessageID());
+      buffer.writeNullableSimpleString(record.getAddressSimpleString());
+      record.persist(buffer);
+   }
+
+
+   @Override
+   public Message decode(ActiveMQBuffer buffer, Message record) {
+      // the caller must consume the first byte already, as that will be used to decide what persister (protocol) to use
+      long id = buffer.readLong();
+      SimpleString address = buffer.readNullableSimpleString();
+      record = new CoreMessage();
+      record.reloadPersistence(buffer);
+      record.setMessageID(id);
+      record.setAddress(address);
+      return record;
+   }
+}