You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/03/06 03:58:38 UTC

[20/21] activemq-artemis git commit: ARTEMIS-1009 Pure Message Encoding.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index 80116ed..c7a831b 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -16,10 +16,13 @@
  */
 package org.apache.activemq.artemis.api.core;
 
+import java.io.InputStream;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.activemq.artemis.utils.UUID;
+import io.netty.buffer.ByteBuf;
+import org.apache.activemq.artemis.core.persistence.Persister;
 
 /**
  * A Message is a routable instance that has a payload.
@@ -48,9 +51,41 @@ import org.apache.activemq.artemis.utils.UUID;
  * <p>
  * If conversion is not allowed (for example calling {@code getFloatProperty} on a property set a
  * {@code boolean}), a {@link ActiveMQPropertyConversionException} will be thrown.
+ *
+ *
+ * User cases that will be covered by Message
+ *
+ * Receiving a buffer:
+ *
+ * Message encode = new CoreMessage(); // or any other implementation
+ * encode.receiveBuffer(buffer);
+ *
+ *
+ * Sending to a buffer:
+ *
+ * Message encode;
+ * size = encode.getEncodeSize();
+ * encode.encodeDirectly(bufferOutput);
+ *
  */
 public interface Message {
 
+   // This is an estimate of how much memory a Message takes up, exclusing body and properties
+   // Note, it is only an estimate, it's not possible to be entirely sure with Java
+   // This figure is calculated using the test utilities in org.apache.activemq.tests.unit.util.sizeof
+   // The value is somewhat higher on 64 bit architectures, probably due to different alignment
+   int memoryOffset = 352;
+
+
+   SimpleString HDR_ROUTE_TO_IDS = new SimpleString("_AMQ_ROUTE_TO");
+
+   SimpleString HDR_SCALEDOWN_TO_IDS = new SimpleString("_AMQ_SCALEDOWN_TO");
+
+   SimpleString HDR_ROUTE_TO_ACK_IDS = new SimpleString("_AMQ_ACK_ROUTE_TO");
+
+   // used by the bridges to set duplicates
+   SimpleString HDR_BRIDGE_DUPLICATE_ID = new SimpleString("_AMQ_BRIDGE_DUP");
+
    /**
     * the actual time the message was expired.
     * * *
@@ -129,6 +164,91 @@ public interface Message {
 
    byte STREAM_TYPE = 6;
 
+
+   default SimpleString getDeliveryAnnotationPropertyString(SimpleString property) {
+      Object obj = getDeliveryAnnotationProperty(property);
+      if (obj instanceof SimpleString) {
+         return (SimpleString)obj;
+      } else {
+         return SimpleString.toSimpleString(obj.toString());
+      }
+   }
+
+   default void cleanupInternalProperties() {
+      // only on core
+   }
+
+   RoutingType getRouteType();
+
+   boolean containsDeliveryAnnotationProperty(SimpleString property);
+
+   /**
+    * @deprecated do not use this, use through ICoreMessage or ClientMessage
+    */
+   @Deprecated
+   default InputStream getBodyInputStream() {
+      return null;
+   }
+
+   /**
+    * @deprecated do not use this, use through ICoreMessage or ClientMessage
+    */
+   @Deprecated
+   default ActiveMQBuffer getBodyBuffer() {
+      return null;
+   }
+
+      /**
+       * @deprecated do not use this, use through ICoreMessage or ClientMessage
+       */
+   @Deprecated
+   default byte getType() {
+      return (byte)0;
+   }
+
+   /**
+    * @deprecated do not use this, use through ICoreMessage or ClientMessage
+    */
+   @Deprecated
+   default Message setType(byte type) {
+      return this;
+   }
+
+
+   void messageChanged();
+
+   /** Used to calculate what is the delivery time.
+    *  Return null if not scheduled. */
+   Long getScheduledDeliveryTime();
+
+   default Message setScheduledDeliveryTime(Long time) {
+      return this;
+   }
+
+   /** Context can be used by the application server to inject extra control, like a protocol specific on the server.
+    * There is only one per Object, use it wisely!
+    *
+    * Note: the intent of this was to replace PageStore reference on Message, but it will be later increased by adidn a ServerPojo
+    * */
+   RefCountMessageListener getContext();
+
+   SimpleString getReplyTo();
+
+   Message setReplyTo(SimpleString address);
+
+   Message setContext(RefCountMessageListener context);
+
+   /** The buffer will belong to this message, until release is called. */
+   Message setBuffer(ByteBuf buffer);
+
+   ByteBuf getBuffer();
+
+   /** It will generate a new instance of the message encode, being a deep copy, new properties, new everything */
+   Message copy();
+
+   /** It will generate a new instance of the message encode, being a deep copy, new properties, new everything */
+   Message copy(long newID);
+
    /**
     * Returns the messageID.
     * <br>
@@ -136,39 +256,45 @@ public interface Message {
     */
    long getMessageID();
 
+   Message setMessageID(long id);
+
+   default boolean isLargeMessage() {
+      return false;
+   }
+
    /**
-    * Returns the userID - this is an optional user specified UUID that can be set to identify the message
-    * and will be passed around with the message
-    *
-    * @return the user id
+    * Returns the expiration time of this message.
     */
-   UUID getUserID();
+   long getExpiration();
 
    /**
-    * Sets the user ID
+    * Sets the expiration of this message.
     *
-    * @param userID
+    * @param expiration expiration time
     */
-   Message setUserID(UUID userID);
+   Message setExpiration(long expiration);
 
    /**
-    * Returns the address this message is sent to.
+    * Returns whether this message is expired or not.
     */
-   SimpleString getAddress();
+   default boolean isExpired() {
+      if (getExpiration() == 0) {
+         return false;
+      }
+
+      return System.currentTimeMillis() - getExpiration() >= 0;
+   }
+
 
    /**
-    * Sets the address to send this message to.
+    * Returns the userID - this is an optional user specified UUID that can be set to identify the message
+    * and will be passed around with the message
     *
-    * @param address address to send the message to
+    * @return the user id
     */
-   Message setAddress(SimpleString address);
+   Object getUserID();
 
-   /**
-    * Returns this message type.
-    * <p>
-    * See fields {@literal *_TYPE} for possible values.
-    */
-   byte getType();
+   Message setUserID(Object userID);
 
    /**
     * Returns whether this message is durable or not.
@@ -182,36 +308,18 @@ public interface Message {
     */
    Message setDurable(boolean durable);
 
-   /**
-    * Returns the expiration time of this message.
-    */
-   long getExpiration();
+   Persister<Message> getPersister();
 
-   /**
-    * Returns whether this message is expired or not.
-    */
-   boolean isExpired();
+   String getAddress();
 
-   /**
-    * Sets the expiration of this message.
-    *
-    * @param expiration expiration time
-    */
-   Message setExpiration(long expiration);
+   Message setAddress(String address);
+
+   SimpleString getAddressSimpleString();
+
+   Message setAddress(SimpleString address);
 
-   /**
-    * Returns the message timestamp.
-    * <br>
-    * The timestamp corresponds to the time this message
-    * was handled by an ActiveMQ Artemis server.
-    */
    long getTimestamp();
 
-   /**
-    * Sets the message timestamp.
-    *
-    * @param timestamp timestamp
-    */
    Message setTimestamp(long timestamp);
 
    /**
@@ -230,164 +338,116 @@ public interface Message {
     */
    Message setPriority(byte priority);
 
-   /**
-    * Returns the size of the <em>encoded</em> message.
-    */
-   int getEncodeSize();
+   /** Used to receive this message from an encoded medium buffer */
+   void receiveBuffer(ByteBuf buffer);
 
-   /**
-    * Returns whether this message is a <em>large message</em> or a regular message.
-    */
-   boolean isLargeMessage();
+   /** Used to send this message to an encoded medium buffer.
+    * @param buffer the buffer used.
+    * @param deliveryCount Some protocols (AMQP) will have this as part of the message. */
+   void sendBuffer(ByteBuf buffer, int deliveryCount);
 
-   /**
-    * Returns the message body as an ActiveMQBuffer
-    */
-   ActiveMQBuffer getBodyBuffer();
+   int getPersistSize();
 
-   /**
-    * Writes the input byte array to the message body ActiveMQBuffer
-    */
-   Message writeBodyBufferBytes(byte[] bytes);
+   void persist(ActiveMQBuffer targetRecord);
 
-   /**
-    * Writes the input String to the message body ActiveMQBuffer
-    */
-   Message writeBodyBufferString(String string);
+   void reloadPersistence(ActiveMQBuffer record);
 
-   /**
-    * Returns a <em>copy</em> of the message body as an ActiveMQBuffer. Any modification
-    * of this buffer should not impact the underlying buffer.
-    */
-   ActiveMQBuffer getBodyBufferDuplicate();
+   default void releaseBuffer() {
+      ByteBuf buffer = getBuffer();
+      if (buffer != null) {
+         buffer.release();
+      }
+      setBuffer(null);
+   }
+   default void referenceOriginalMessage(final Message original, String originalQueue) {
+      String queueOnMessage = original.getStringProperty(Message.HDR_ORIGINAL_QUEUE.toString());
 
-   // Properties
-   // -----------------------------------------------------------------
+      if (queueOnMessage != null) {
+         putStringProperty(Message.HDR_ORIGINAL_QUEUE.toString(), queueOnMessage);
+      } else if (originalQueue != null) {
+         putStringProperty(Message.HDR_ORIGINAL_QUEUE.toString(), originalQueue);
+      }
 
-   /**
-    * Puts a boolean property in this message.
-    *
-    * @param key   property name
-    * @param value property value
-    */
-   Message putBooleanProperty(SimpleString key, boolean value);
+      if (original.containsProperty(Message.HDR_ORIG_MESSAGE_ID.toString())) {
+         putStringProperty(Message.HDR_ORIGINAL_ADDRESS.toString(), original.getStringProperty(Message.HDR_ORIGINAL_ADDRESS.toString()));
 
-   /**
-    * @see #putBooleanProperty(SimpleString, boolean)
-    */
-   Message putBooleanProperty(String key, boolean value);
+         putLongProperty(Message.HDR_ORIG_MESSAGE_ID.toString(), original.getLongProperty(Message.HDR_ORIG_MESSAGE_ID.toString()));
+      } else {
+         putStringProperty(Message.HDR_ORIGINAL_ADDRESS.toString(), original.getAddress());
 
-   /**
-    * Puts a byte property in this message.
-    *
-    * @param key   property name
-    * @param value property value
-    */
-   Message putByteProperty(SimpleString key, byte value);
+         putLongProperty(Message.HDR_ORIG_MESSAGE_ID.toString(), original.getMessageID());
+      }
 
-   /**
-    * @see #putByteProperty(SimpleString, byte)
-    */
-   Message putByteProperty(String key, byte value);
+      // reset expiry
+      setExpiration(0);
+   }
 
    /**
-    * Puts a byte[] property in this message.
-    *
-    * @param key   property name
-    * @param value property value
+    * it will translate a property named HDR_DUPLICATE_DETECTION_ID.
+    * @return
     */
-   Message putBytesProperty(SimpleString key, byte[] value);
+   default byte[] getDuplicateIDBytes() {
+      Object duplicateID = getDuplicateProperty();
 
-   /**
-    * @see #putBytesProperty(SimpleString, byte[])
-    */
-   Message putBytesProperty(String key, byte[] value);
+      if (duplicateID == null) {
+         return null;
+      } else {
+         if (duplicateID instanceof SimpleString) {
+            return ((SimpleString) duplicateID).getData();
+         } else if (duplicateID instanceof String) {
+            return new SimpleString(duplicateID.toString()).getData();
+         } else {
+            return (byte[]) duplicateID;
+         }
+      }
+   }
 
    /**
-    * Puts a short property in this message.
-    *
-    * @param key   property name
-    * @param value property value
+    * it will translate a property named HDR_DUPLICATE_DETECTION_ID.
+    * @return
     */
-   Message putShortProperty(SimpleString key, short value);
+   default Object getDuplicateProperty() {
+      return getDeliveryAnnotationProperty(Message.HDR_DUPLICATE_DETECTION_ID);
+   }
 
-   /**
-    * @see #putShortProperty(SimpleString, short)
-    */
-   Message putShortProperty(String key, short value);
 
-   /**
-    * Puts a char property in this message.
-    *
-    * @param key   property name
-    * @param value property value
-    */
-   Message putCharProperty(SimpleString key, char value);
+   Message putBooleanProperty(String key, boolean value);
 
-   /**
-    * @see #putCharProperty(SimpleString, char)
-    */
-   Message putCharProperty(String key, char value);
+   Message putByteProperty(String key, byte value);
 
-   /**
-    * Puts an int property in this message.
-    *
-    * @param key   property name
-    * @param value property value
-    */
-   Message putIntProperty(SimpleString key, int value);
+   Message putBytesProperty(String key, byte[] value);
 
-   /**
-    * @see #putIntProperty(SimpleString, int)
-    */
-   Message putIntProperty(String key, int value);
+   Message putShortProperty(String key, short value);
 
-   /**
-    * Puts a long property in this message.
-    *
-    * @param key   property name
-    * @param value property value
-    */
-   Message putLongProperty(SimpleString key, long value);
+   Message putCharProperty(String key, char value);
 
-   /**
-    * @see #putLongProperty(SimpleString, long)
-    */
-   Message putLongProperty(String key, long value);
+   Message putIntProperty(String key, int value);
 
-   /**
-    * Puts a float property in this message.
-    *
-    * @param key   property name
-    * @param value property value
-    */
-   Message putFloatProperty(SimpleString key, float value);
+   Message putLongProperty(String key, long value);
 
-   /**
-    * @see #putFloatProperty(SimpleString, float)
-    */
    Message putFloatProperty(String key, float value);
 
-   /**
-    * Puts a double property in this message.
-    *
-    * @param key   property name
-    * @param value property value
-    */
-   Message putDoubleProperty(SimpleString key, double value);
-
-   /**
-    * @see #putDoubleProperty(SimpleString, double)
-    */
    Message putDoubleProperty(String key, double value);
 
-   /**
-    * Puts a SimpleString property in this message.
-    *
-    * @param key   property name
-    * @param value property value
-    */
-   Message putStringProperty(SimpleString key, SimpleString value);
+
+
+   Message putBooleanProperty(SimpleString key, boolean value);
+
+   Message putByteProperty(SimpleString key, byte value);
+
+   Message putBytesProperty(SimpleString key, byte[] value);
+
+   Message putShortProperty(SimpleString key, short value);
+
+   Message putCharProperty(SimpleString key, char value);
+
+   Message putIntProperty(SimpleString key, int value);
+
+   Message putLongProperty(SimpleString key, long value);
+
+   Message putFloatProperty(SimpleString key, float value);
+
+   Message putDoubleProperty(SimpleString key, double value);
 
    /**
     * Puts a String property in this message.
@@ -397,202 +457,127 @@ public interface Message {
     */
    Message putStringProperty(String key, String value);
 
-   /**
-    * Puts an Object property in this message. <br>
-    * Accepted types are:
-    * <ul>
-    * <li>Boolean</li>
-    * <li>Byte</li>
-    * <li>Short</li>
-    * <li>Character</li>
-    * <li>Integer</li>
-    * <li>Long</li>
-    * <li>Float</li>
-    * <li>Double</li>
-    * <li>String</li>
-    * <li>SimpleString</li>
-    * </ul>
-    * Using any other type will throw a PropertyConversionException.
-    *
-    * @param key   property name
-    * @param value property value
-    * @throws ActiveMQPropertyConversionException if the value is not one of the accepted property
-    *                                             types.
-    */
-   Message putObjectProperty(SimpleString key, Object value) throws ActiveMQPropertyConversionException;
-
-   /**
-    * @see #putObjectProperty(SimpleString, Object)
-    */
    Message putObjectProperty(String key, Object value) throws ActiveMQPropertyConversionException;
 
-   /**
-    * Removes the property corresponding to the specified key.
-    *
-    * @param key property name
-    * @return the value corresponding to the specified key or @{code null}
-    */
-   Object removeProperty(SimpleString key);
+   Message putObjectProperty(SimpleString key, Object value) throws ActiveMQPropertyConversionException;
 
-   /**
-    * @see #removeProperty(SimpleString)
-    */
    Object removeProperty(String key);
 
-   /**
-    * Returns {@code true} if this message contains a property with the given key, {@code false} else.
-    *
-    * @param key property name
-    */
-   boolean containsProperty(SimpleString key);
-
-   /**
-    * @see #containsProperty(SimpleString)
-    */
    boolean containsProperty(String key);
 
-   /**
-    * Returns the property corresponding to the specified key as a Boolean.
-    *
-    * @throws ActiveMQPropertyConversionException if the value can not be converted to a Boolean
-    */
-   Boolean getBooleanProperty(SimpleString key) throws ActiveMQPropertyConversionException;
-
-   /**
-    * @see #getBooleanProperty(SimpleString)
-    */
    Boolean getBooleanProperty(String key) throws ActiveMQPropertyConversionException;
 
-   /**
-    * Returns the property corresponding to the specified key as a Byte.
-    *
-    * @throws ActiveMQPropertyConversionException if the value can not be converted to a Byte
-    */
-   Byte getByteProperty(SimpleString key) throws ActiveMQPropertyConversionException;
-
-   /**
-    * @see #getByteProperty(SimpleString)
-    */
    Byte getByteProperty(String key) throws ActiveMQPropertyConversionException;
 
-   /**
-    * Returns the property corresponding to the specified key as a Double.
-    *
-    * @throws ActiveMQPropertyConversionException if the value can not be converted to a Double
-    */
-   Double getDoubleProperty(SimpleString key) throws ActiveMQPropertyConversionException;
-
-   /**
-    * @see #getDoubleProperty(SimpleString)
-    */
    Double getDoubleProperty(String key) throws ActiveMQPropertyConversionException;
 
-   /**
-    * Returns the property corresponding to the specified key as an Integer.
-    *
-    * @throws ActiveMQPropertyConversionException if the value can not be converted to an Integer
-    */
-   Integer getIntProperty(SimpleString key) throws ActiveMQPropertyConversionException;
-
-   /**
-    * @see #getIntProperty(SimpleString)
-    */
    Integer getIntProperty(String key) throws ActiveMQPropertyConversionException;
 
-   /**
-    * Returns the property corresponding to the specified key as a Long.
-    *
-    * @throws ActiveMQPropertyConversionException if the value can not be converted to a Long
-    */
-   Long getLongProperty(SimpleString key) throws ActiveMQPropertyConversionException;
-
-   /**
-    * @see #getLongProperty(SimpleString)
-    */
    Long getLongProperty(String key) throws ActiveMQPropertyConversionException;
 
-   /**
-    * Returns the property corresponding to the specified key
-    */
+   Object getObjectProperty(String key);
+
+   Short getShortProperty(String key) throws ActiveMQPropertyConversionException;
+
+   Float getFloatProperty(String key) throws ActiveMQPropertyConversionException;
+
+   String getStringProperty(String key) throws ActiveMQPropertyConversionException;
+
+   SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException;
+
+   byte[] getBytesProperty(String key) throws ActiveMQPropertyConversionException;
+
+   Object removeProperty(SimpleString key);
+
+   boolean containsProperty(SimpleString key);
+
+   Boolean getBooleanProperty(SimpleString key) throws ActiveMQPropertyConversionException;
+
+   Byte getByteProperty(SimpleString key) throws ActiveMQPropertyConversionException;
+
+   Double getDoubleProperty(SimpleString key) throws ActiveMQPropertyConversionException;
+
+   Integer getIntProperty(SimpleString key) throws ActiveMQPropertyConversionException;
+
+   Long getLongProperty(SimpleString key) throws ActiveMQPropertyConversionException;
+
    Object getObjectProperty(SimpleString key);
 
-   /**
-    * @see #getBooleanProperty(SimpleString)
-    */
-   Object getObjectProperty(String key);
+   Object removeDeliveryAnnoationProperty(SimpleString key);
 
-   /**
-    * Returns the property corresponding to the specified key as a Short.
-    *
-    * @throws ActiveMQPropertyConversionException if the value can not be converted to a Short
-    */
-   Short getShortProperty(SimpleString key) throws ActiveMQPropertyConversionException;
+   Object getDeliveryAnnotationProperty(SimpleString key);
 
-   /**
-    * @see #getShortProperty(SimpleString)
-    */
-   Short getShortProperty(String key) throws ActiveMQPropertyConversionException;
+   Short getShortProperty(SimpleString key) throws ActiveMQPropertyConversionException;
 
-   /**
-    * Returns the property corresponding to the specified key as a Float.
-    *
-    * @throws ActiveMQPropertyConversionException if the value can not be converted to a Float
-    */
    Float getFloatProperty(SimpleString key) throws ActiveMQPropertyConversionException;
 
-   /**
-    * @see #getFloatProperty(SimpleString)
-    */
-   Float getFloatProperty(String key) throws ActiveMQPropertyConversionException;
-
-   /**
-    * Returns the property corresponding to the specified key as a String.
-    *
-    * @throws ActiveMQPropertyConversionException if the value can not be converted to a String
-    */
    String getStringProperty(SimpleString key) throws ActiveMQPropertyConversionException;
 
-   /**
-    * @see #getStringProperty(SimpleString)
-    */
-   String getStringProperty(String key) throws ActiveMQPropertyConversionException;
-
-   /**
-    * Returns the property corresponding to the specified key as a SimpleString.
-    *
-    * @throws ActiveMQPropertyConversionException if the value can not be converted to a SimpleString
-    */
    SimpleString getSimpleStringProperty(SimpleString key) throws ActiveMQPropertyConversionException;
 
-   /**
-    * @see #getSimpleStringProperty(SimpleString)
-    */
-   SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException;
-
-   /**
-    * Returns the property corresponding to the specified key as a byte[].
-    *
-    * @throws ActiveMQPropertyConversionException if the value can not be converted to a byte[]
-    */
    byte[] getBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException;
 
+   Message putStringProperty(SimpleString key, SimpleString value);
+
    /**
-    * @see #getBytesProperty(SimpleString)
+    * Returns the size of the <em>encoded</em> message.
     */
-   byte[] getBytesProperty(String key) throws ActiveMQPropertyConversionException;
+   int getEncodeSize();
 
    /**
     * Returns all the names of the properties for this message.
     */
    Set<SimpleString> getPropertyNames();
 
+
+
+   int getRefCount();
+
+   int incrementRefCount() throws Exception;
+
+   int decrementRefCount() throws Exception;
+
+   int incrementDurableRefCount();
+
+   int decrementDurableRefCount();
+
    /**
     * @return Returns the message in Map form, useful when encoding to JSON
     */
-   Map<String, Object> toMap();
+   default Map<String, Object> toMap() {
+      Map map = toPropertyMap();
+      map.put("messageID", getMessageID());
+      Object userID = getUserID();
+      if (getUserID() != null) {
+         map.put("userID", "ID:" + userID.toString());
+      }
+
+      map.put("address", getAddress());
+      map.put("durable", isDurable());
+      map.put("expiration", getExpiration());
+      map.put("timestamp", getTimestamp());
+      map.put("priority", (int)getPriority());
+
+      return map;
+   }
 
    /**
     * @return Returns the message properties in Map form, useful when encoding to JSON
     */
-   Map<String, Object> toPropertyMap();
+   default Map<String, Object> toPropertyMap() {
+      Map map = new HashMap<>();
+      for (SimpleString name : getPropertyNames()) {
+         map.put(name.toString(), getObjectProperty(name.toString()));
+      }
+      return map;
+   }
+
+
+   /** This should make you convert your message into Core format. */
+   ICoreMessage toCore();
+
+   int getMemoryEstimate();
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessage.java
new file mode 100644
index 0000000..64dd44d
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessage.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.api.core;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class RefCountMessage implements Message {
+
+   private final AtomicInteger durableRefCount = new AtomicInteger();
+
+   private final AtomicInteger refCount = new AtomicInteger();
+
+   private RefCountMessageListener context;
+
+   @Override
+   public Message setContext(RefCountMessageListener context) {
+      this.context = context;
+      return this;
+   }
+
+   @Override
+   public RefCountMessageListener getContext() {
+      return context;
+   }
+
+   @Override
+   public int getRefCount() {
+      return refCount.get();
+   }
+
+   @Override
+   public int incrementRefCount() throws Exception {
+      int count = refCount.incrementAndGet();
+      if (context != null) {
+         context.nonDurableUp(this, count);
+      }
+      return count;
+   }
+
+   @Override
+   public int incrementDurableRefCount() {
+      int count = durableRefCount.incrementAndGet();
+      if (context != null) {
+         context.durableUp(this, count);
+      }
+      return count;
+   }
+
+   @Override
+   public int decrementDurableRefCount() {
+      int count = durableRefCount.decrementAndGet();
+      if (context != null) {
+         context.durableDown(this, count);
+      }
+      return count;
+   }
+
+   @Override
+   public int decrementRefCount() throws Exception {
+      int count = refCount.decrementAndGet();
+      if (context != null) {
+         context.nonDurableDown(this, count);
+      }
+      return count;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessageListener.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessageListener.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessageListener.java
new file mode 100644
index 0000000..e68dffd
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessageListener.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.api.core;
+
+/** If {@link Message#getContext()} != null and is implementing this interface.
+ *  These methods will be called during refCount operations */
+public interface RefCountMessageListener {
+
+   void durableUp(Message message, int durableCount);
+
+   void durableDown(Message message, int durableCount);
+
+   void nonDurableUp(Message message, int nonDurableCoun);
+
+   void nonDurableDown(Message message, int nonDurableCoun);
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientMessage.java
index e87d365..67f2150 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientMessage.java
@@ -19,14 +19,15 @@ package org.apache.activemq.artemis.api.core.client;
 import java.io.InputStream;
 import java.io.OutputStream;
 
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.SimpleString;
 
 /**
  * A ClientMessage represents a message sent and/or received by ActiveMQ Artemis.
  */
-public interface ClientMessage extends Message {
+public interface ClientMessage extends ICoreMessage {
 
    /**
     * Returns the number of times this message was delivered.
@@ -123,135 +124,141 @@ public interface ClientMessage extends Message {
    ClientMessage setBodyInputStream(InputStream bodyInputStream);
 
    /**
-    * Overridden from {@link Message} to enable fluent API
+    * Return the bodyInputStream for large messages
+    * @return
+    */
+   @Override
+   InputStream getBodyInputStream();
+
+   /**
+    * The buffer to write the body.
+    * @return
+    */
+   @Override
+   ActiveMQBuffer getBodyBuffer();
+
+   /**
+    * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
     */
    @Override
    ClientMessage putBooleanProperty(SimpleString key, boolean value);
 
    /**
-    * Overridden from {@link Message} to enable fluent API
+    * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
     */
    @Override
    ClientMessage putBooleanProperty(String key, boolean value);
 
    /**
-    * Overridden from {@link Message} to enable fluent API
+    * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
     */
    @Override
    ClientMessage putByteProperty(SimpleString key, byte value);
 
    /**
-    * Overridden from {@link Message} to enable fluent API
+    * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
     */
    @Override
    ClientMessage putByteProperty(String key, byte value);
 
    /**
-    * Overridden from {@link Message} to enable fluent API
+    * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
     */
    @Override
    ClientMessage putBytesProperty(SimpleString key, byte[] value);
 
    /**
-    * Overridden from {@link Message} to enable fluent API
+    * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
     */
    @Override
    ClientMessage putBytesProperty(String key, byte[] value);
 
    /**
-    * Overridden from {@link Message} to enable fluent API
+    * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
     */
    @Override
    ClientMessage putShortProperty(SimpleString key, short value);
 
    /**
-    * Overridden from {@link Message} to enable fluent API
+    * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
     */
    @Override
    ClientMessage putShortProperty(String key, short value);
 
    /**
-    * Overridden from {@link Message} to enable fluent API
+    * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
     */
    @Override
    ClientMessage putCharProperty(SimpleString key, char value);
 
    /**
-    * Overridden from {@link Message} to enable fluent API
+    * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
     */
    @Override
    ClientMessage putCharProperty(String key, char value);
 
    /**
-    * Overridden from {@link Message} to enable fluent API
+    * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
     */
    @Override
    ClientMessage putIntProperty(SimpleString key, int value);
 
    /**
-    * Overridden from {@link Message} to enable fluent API
+    * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
     */
    @Override
    ClientMessage putIntProperty(String key, int value);
 
    /**
-    * Overridden from {@link Message} to enable fluent API
+    * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
     */
    @Override
    ClientMessage putLongProperty(SimpleString key, long value);
 
    /**
-    * Overridden from {@link Message} to enable fluent API
+    * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
     */
    @Override
    ClientMessage putLongProperty(String key, long value);
 
    /**
-    * Overridden from {@link Message} to enable fluent API
+    * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
     */
    @Override
    ClientMessage putFloatProperty(SimpleString key, float value);
 
    /**
-    * Overridden from {@link Message} to enable fluent API
+    * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
     */
    @Override
    ClientMessage putFloatProperty(String key, float value);
 
    /**
-    * Overridden from {@link Message} to enable fluent API
+    * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
     */
    @Override
    ClientMessage putDoubleProperty(SimpleString key, double value);
 
    /**
-    * Overridden from {@link Message} to enable fluent API
+    * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
     */
    @Override
    ClientMessage putDoubleProperty(String key, double value);
 
    /**
-    * Overridden from {@link Message} to enable fluent API
-    */
-   @Override
-   ClientMessage putStringProperty(SimpleString key, SimpleString value);
-
-   /**
-    * Overridden from {@link Message} to enable fluent API
+    * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
     */
    @Override
    ClientMessage putStringProperty(String key, String value);
 
    /**
-    * Overridden from {@link Message} to enable fluent API
+    * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
     */
-   @Override
    ClientMessage writeBodyBufferBytes(byte[] bytes);
 
    /**
-    * Overridden from {@link Message} to enable fluent API
+    * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API
     */
-   @Override
    ClientMessage writeBodyBufferString(String string);
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/encode/BodyType.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/encode/BodyType.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/encode/BodyType.java
new file mode 100644
index 0000000..743583b
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/encode/BodyType.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.api.core.encode;
+
+public enum BodyType {
+   Undefined, Bytes, Map, Object, Stream, Text
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/encode/MessageBody.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/encode/MessageBody.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/encode/MessageBody.java
new file mode 100644
index 0000000..f7821b9
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/encode/MessageBody.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.api.core.encode;
+
+import java.nio.ByteBuffer;
+
+public interface MessageBody {
+   Object getBody();
+
+   ByteBuffer getBodyArray();
+
+   BodyType getType();
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java
index 40211c1..946285d 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java
@@ -18,9 +18,11 @@ package org.apache.activemq.artemis.api.core.management;
 
 import javax.json.JsonArray;
 
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.JsonUtil;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 
 /**
  * Helper class to use ActiveMQ Artemis Core messages to manage server resources.
@@ -86,7 +88,7 @@ public final class ManagementHelper {
     * @param attribute    the name of the attribute
     * @see ResourceNames
     */
-   public static void putAttribute(final Message message, final String resourceName, final String attribute) {
+   public static void putAttribute(final ICoreMessage message, final String resourceName, final String attribute) {
       message.putStringProperty(ManagementHelper.HDR_RESOURCE_NAME, new SimpleString(resourceName));
       message.putStringProperty(ManagementHelper.HDR_ATTRIBUTE, new SimpleString(attribute));
    }
@@ -99,7 +101,7 @@ public final class ManagementHelper {
     * @param operationName the name of the operation to invoke on the resource
     * @see ResourceNames
     */
-   public static void putOperationInvocation(final Message message,
+   public static void putOperationInvocation(final ICoreMessage message,
                                              final String resourceName,
                                              final String operationName) throws Exception {
       ManagementHelper.putOperationInvocation(message, resourceName, operationName, (Object[]) null);
@@ -114,7 +116,7 @@ public final class ManagementHelper {
     * @param parameters    the parameters to use to invoke the server resource
     * @see ResourceNames
     */
-   public static void putOperationInvocation(final Message message,
+   public static void putOperationInvocation(final ICoreMessage message,
                                              final String resourceName,
                                              final String operationName,
                                              final Object... parameters) throws Exception {
@@ -141,7 +143,7 @@ public final class ManagementHelper {
     * Used by ActiveMQ Artemis management service.
     */
    public static Object[] retrieveOperationParameters(final Message message) throws Exception {
-      SimpleString sstring = message.getBodyBuffer().readNullableSimpleString();
+      SimpleString sstring = message.toCore().getReadOnlyBodyBuffer().readNullableSimpleString();
       String jsonString = (sstring == null) ? null : sstring.toString();
 
       if (jsonString != null) {
@@ -170,7 +172,7 @@ public final class ManagementHelper {
    /**
     * Used by ActiveMQ Artemis management service.
     */
-   public static void storeResult(final Message message, final Object result) throws Exception {
+   public static void storeResult(final CoreMessage message, final Object result) throws Exception {
       String resultString;
 
       if (result != null) {
@@ -192,7 +194,7 @@ public final class ManagementHelper {
     * If an error occurred on the server, {@link #hasOperationSucceeded(Message)} will return {@code false}.
     * and the result will be a String corresponding to the server exception.
     */
-   public static Object[] getResults(final Message message) throws Exception {
+   public static Object[] getResults(final ICoreMessage message) throws Exception {
       SimpleString sstring = message.getBodyBuffer().readNullableSimpleString();
       String jsonString = (sstring == null) ? null : sstring.toString();
 
@@ -210,7 +212,7 @@ public final class ManagementHelper {
     * If an error occurred on the server, {@link #hasOperationSucceeded(Message)} will return {@code false}.
     * and the result will be a String corresponding to the server exception.
     */
-   public static Object getResult(final Message message) throws Exception {
+   public static Object getResult(final ICoreMessage message) throws Exception {
       return getResult(message, null);
    }
 
@@ -220,7 +222,7 @@ public final class ManagementHelper {
     * If an error occurred on the server, {@link #hasOperationSucceeded(Message)} will return {@code false}.
     * and the result will be a String corresponding to the server exception.
     */
-   public static Object getResult(final Message message, Class desiredType) throws Exception {
+   public static Object getResult(final ICoreMessage message, Class desiredType) throws Exception {
       Object[] res = ManagementHelper.getResults(message);
 
       if (res != null) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java
index 900305f..b5d5474 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java
@@ -20,18 +20,18 @@ import java.nio.ByteBuffer;
 
 import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
 
 /**
  * A ResetLimitWrappedActiveMQBuffer
- * TODO: Move this to commons
  */
 public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper {
 
    private final int limit;
 
-   private MessageInternal message;
+   private Message message;
 
    /**
     * We need to turn of notifications of body changes on reset on the server side when dealing with AMQP conversions,
@@ -39,17 +39,17 @@ public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper
     *
     * @param message
     */
-   public void setMessage(MessageInternal message) {
+   public void setMessage(Message message) {
       this.message = message;
    }
 
-   public ResetLimitWrappedActiveMQBuffer(final int limit, final ActiveMQBuffer buffer, final MessageInternal message) {
+   public ResetLimitWrappedActiveMQBuffer(final int limit, final ActiveMQBuffer buffer, final Message message) {
       // a wrapped inside a wrapper will increase the stack size.
       // we fixed this here due to some profiling testing
       this(limit, unwrap(buffer.byteBuf()).duplicate(), message);
    }
 
-   public ResetLimitWrappedActiveMQBuffer(final int limit, final ByteBuf buffer, final MessageInternal message) {
+   public ResetLimitWrappedActiveMQBuffer(final int limit, final ByteBuf buffer, final Message message) {
       // a wrapped inside a wrapper will increase the stack size.
       // we fixed this here due to some profiling testing
       super(buffer);
@@ -67,7 +67,7 @@ public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper
 
    private void changed() {
       if (message != null) {
-         message.bodyChanged();
+         message.messageChanged();
       }
    }
 
@@ -94,8 +94,6 @@ public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper
 
    @Override
    public void resetReaderIndex() {
-      changed();
-
       buffer.readerIndex(limit);
    }
 
@@ -256,6 +254,14 @@ public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper
       super.writeBytes(src);
    }
 
+
+   @Override
+   public void writeBytes(final ByteBuf src, final int srcIndex, final int length) {
+      changed();
+
+      super.writeBytes(src, srcIndex, length);
+   }
+
    @Override
    public void writeBytes(final ActiveMQBuffer src, final int srcIndex, final int length) {
       changed();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
index 2b4ab7e..82af968 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
@@ -569,7 +569,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
 
    private void handleRegularMessage(ClientMessageInternal message) {
       if (message.getAddress() == null) {
-         message.setAddressTransient(queueInfo.getAddress());
+         message.setAddress(queueInfo.getAddress());
       }
 
       message.onReceipt(this);
@@ -625,7 +625,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
       currentLargeMessageController.setLocal(true);
 
       //sets the packet
-      ActiveMQBuffer qbuff = clMessage.getBodyBuffer();
+      ActiveMQBuffer qbuff = clMessage.toCore().getBodyBuffer();
       int bytesToRead = qbuff.writerIndex() - qbuff.readerIndex();
       final byte[] body = ByteUtil.getActiveArray(qbuff.readBytes(bytesToRead).toByteBuffer());
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java
index c3cbceb..cbfaf6f 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientLargeMessageImpl.java
@@ -59,7 +59,7 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C
 
    @Override
    public int getEncodeSize() {
-      if (bodyBuffer != null) {
+      if (writableBuffer != null) {
          return super.getEncodeSize();
       } else {
          return DataConstants.SIZE_INT + DataConstants.SIZE_INT + getHeadersAndPropertiesEncodeSize();
@@ -93,7 +93,7 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C
          throw new RuntimeException(e.getMessage(), e);
       }
 
-      return bodyBuffer;
+      return writableBuffer;
    }
 
    @Override
@@ -108,7 +108,7 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C
 
    @Override
    public void saveToOutputStream(final OutputStream out) throws ActiveMQException {
-      if (bodyBuffer != null) {
+      if (writableBuffer != null) {
          // The body was rebuilt on the client, so we need to behave as a regular message on this case
          super.saveToOutputStream(out);
       } else {
@@ -118,7 +118,7 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C
 
    @Override
    public ClientLargeMessageImpl setOutputStream(final OutputStream out) throws ActiveMQException {
-      if (bodyBuffer != null) {
+      if (writableBuffer != null) {
          super.setOutputStream(out);
       } else {
          largeMessageController.setOutputStream(out);
@@ -129,7 +129,7 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C
 
    @Override
    public boolean waitOutputStreamCompletion(final long timeMilliseconds) throws ActiveMQException {
-      if (bodyBuffer != null) {
+      if (writableBuffer != null) {
          return super.waitOutputStreamCompletion(timeMilliseconds);
       } else {
          return largeMessageController.waitCompletion(timeMilliseconds);
@@ -138,7 +138,7 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C
 
    @Override
    public void discardBody() {
-      if (bodyBuffer != null) {
+      if (writableBuffer != null) {
          super.discardBody();
       } else {
          largeMessageController.discardUnusedPackets();
@@ -146,17 +146,17 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C
    }
 
    private void checkBuffer() throws ActiveMQException {
-      if (bodyBuffer == null) {
+      if (writableBuffer == null) {
 
          long bodySize = this.largeMessageSize + BODY_OFFSET;
          if (bodySize > Integer.MAX_VALUE) {
             bodySize = Integer.MAX_VALUE;
          }
-         createBody((int) bodySize);
+         initBuffer((int) bodySize);
 
-         bodyBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, buffer, this);
+         writableBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, buffer.duplicate(), this);
 
-         largeMessageController.saveBuffer(new ActiveMQOutputStream(bodyBuffer));
+         largeMessageController.saveBuffer(new ActiveMQOutputStream(writableBuffer));
       }
    }
 
@@ -178,7 +178,7 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C
 
    public void retrieveExistingData(ClientMessageInternal clMessage) {
       this.messageID = clMessage.getMessageID();
-      this.address = clMessage.getAddress();
+      this.address = clMessage.getAddressSimpleString();
       this.setUserID(clMessage.getUserID());
       this.setFlowControlSize(clMessage.getFlowControlSize());
       this.setDeliveryCount(clMessage.getDeliveryCount());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
index 7bf8eb7..252ae86 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
@@ -28,14 +28,16 @@ import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
-import org.apache.activemq.artemis.core.message.BodyEncoder;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
+import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.reader.MessageUtil;
+import org.apache.activemq.artemis.utils.TypedProperties;
+import org.apache.activemq.artemis.utils.UUID;
 
 /**
  * A ClientMessageImpl
  */
-public class ClientMessageImpl extends MessageImpl implements ClientMessageInternal {
+public class ClientMessageImpl extends CoreMessage implements ClientMessageInternal {
 
    // added this constant here so that the client package have no dependency on JMS
    public static final SimpleString REPLYTO_HEADER_NAME = MessageUtil.REPLYTO_HEADER_NAME;
@@ -57,6 +59,35 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
    public ClientMessageImpl() {
    }
 
+   protected ClientMessageImpl(ClientMessageImpl other) {
+      super(other);
+   }
+
+   @Override
+   public ClientMessageImpl setDurable(boolean durable) {
+      super.setDurable(durable);
+      return this;
+   }
+
+   @Override
+   public ClientMessageImpl setExpiration(long expiration) {
+      super.setExpiration(expiration);
+      return this;
+   }
+
+   @Override
+   public ClientMessageImpl setPriority(byte priority) {
+      super.setPriority(priority);
+      return this;
+   }
+
+   @Override
+   public ClientMessageImpl setUserID(UUID userID) {
+
+      return this;
+   }
+
+
    /*
     * Construct messages before sending
     */
@@ -66,12 +97,13 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
                             final long timestamp,
                             final byte priority,
                             final int initialMessageBufferSize) {
-      super(type, durable, expiration, timestamp, priority, initialMessageBufferSize);
+      this.setType(type).setExpiration(expiration).setTimestamp(timestamp).setDurable(durable).
+           setPriority(priority).initBuffer(initialMessageBufferSize);
    }
 
    @Override
-   public boolean isServerMessage() {
-      return false;
+   public TypedProperties getProperties() {
+      return this.checkProperties();
    }
 
    @Override
@@ -108,6 +140,11 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
       return this;
    }
 
+
+   @Override
+   public void checkCompletion() throws ActiveMQException {
+   }
+
    @Override
    public int getFlowControlSize() {
       if (flowControlSize < 0) {
@@ -141,7 +178,7 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
 
    @Override
    public String toString() {
-      return getClass().getSimpleName() + "[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + ",userID=" + (getUserID() != null ? getUserID() : "null") + ",properties=" + properties.toString() + "]";
+      return getClass().getSimpleName() + "[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + ",userID=" + (getUserID() != null ? getUserID() : "null") + ",properties=" + getProperties().toString() + "]";
    }
 
    @Override
@@ -189,7 +226,7 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
    }
 
    @Override
-   public BodyEncoder getBodyEncoder() throws ActiveMQException {
+   public LargeBodyEncoder getBodyEncoder() throws ActiveMQException {
       return new DecodingContext();
    }
 
@@ -307,15 +344,17 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
 
    @Override
    public ClientMessageImpl writeBodyBufferBytes(byte[] bytes) {
-      return (ClientMessageImpl) super.writeBodyBufferBytes(bytes);
+      getBodyBuffer().writeBytes(bytes);
+      return this;
    }
 
    @Override
    public ClientMessageImpl writeBodyBufferString(String string) {
-      return (ClientMessageImpl) super.writeBodyBufferString(string);
+      getBodyBuffer().writeString(string);
+      return this;
    }
 
-   private final class DecodingContext implements BodyEncoder {
+   private final class DecodingContext implements LargeBodyEncoder {
 
       private DecodingContext() {
       }
@@ -347,9 +386,15 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
       @Override
       public int encode(final ActiveMQBuffer bufferOut, final int size) {
          byte[] bytes = new byte[size];
-         getWholeBuffer().readBytes(bytes);
+         buffer.readBytes(bytes);
          bufferOut.writeBytes(bytes, 0, size);
          return size;
       }
    }
+
+   @Override
+   public Message copy() {
+      return new ClientMessageImpl(this);
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageInternal.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageInternal.java
index 07d4719..4b87878 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageInternal.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageInternal.java
@@ -16,7 +16,6 @@
  */
 package org.apache.activemq.artemis.core.client.impl;
 
-import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.utils.TypedProperties;
 
@@ -34,8 +33,6 @@ public interface ClientMessageInternal extends ClientMessage {
     */
    void setFlowControlSize(int flowControlSize);
 
-   void setAddressTransient(SimpleString address);
-
    void onReceipt(ClientConsumerInternal consumer);
 
    /**
@@ -44,4 +41,5 @@ public interface ClientMessageInternal extends ClientMessage {
    void discardBody();
 
    boolean isCompressed();
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
index 1dfbe72..ce4a8a1 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
@@ -23,12 +23,12 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
 import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
-import org.apache.activemq.artemis.core.message.BodyEncoder;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
 import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
 import org.apache.activemq.artemis.utils.ActiveMQBufferInputStream;
 import org.apache.activemq.artemis.utils.DeflaterReader;
@@ -208,7 +208,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
    }
 
    private void doSend(SimpleString sendingAddress,
-                       final Message msg,
+                       final Message msgToSend,
                        final SendAcknowledgementHandler handler,
                        final boolean forceAsync) throws ActiveMQException {
       if (sendingAddress == null) {
@@ -217,7 +217,8 @@ public class ClientProducerImpl implements ClientProducerInternal {
       session.startCall();
 
       try {
-         MessageInternal msgI = (MessageInternal) msg;
+         // In case we received message from another protocol, we first need to convert it to core as the ClientProducer only understands core
+         ICoreMessage msg = msgToSend.toCore();
 
          ClientProducerCredits theCredits;
 
@@ -225,8 +226,8 @@ public class ClientProducerImpl implements ClientProducerInternal {
          // a note about the second check on the writerIndexSize,
          // If it's a server's message, it means this is being done through the bridge or some special consumer on the
          // server's on which case we can't' convert the message into large at the servers
-         if (sessionContext.supportsLargeMessage() && (msgI.getBodyInputStream() != null || msgI.isLargeMessage() ||
-            msgI.getBodyBuffer().writerIndex() > minLargeMessageSize && !msgI.isServerMessage())) {
+         if (sessionContext.supportsLargeMessage() && (getBodyInputStream(msg) != null || msg.isLargeMessage() ||
+            msg.getBodyBuffer().writerIndex() > minLargeMessageSize)) {
             isLarge = true;
          } else {
             isLarge = false;
@@ -248,27 +249,31 @@ public class ClientProducerImpl implements ClientProducerInternal {
          }
 
          if (groupID != null) {
-            msgI.putStringProperty(Message.HDR_GROUP_ID, groupID);
+            msg.putStringProperty(Message.HDR_GROUP_ID, groupID);
          }
 
-         final boolean sendBlockingConfig = msgI.isDurable() ? blockOnDurableSend : blockOnNonDurableSend;
+         final boolean sendBlockingConfig = msg.isDurable() ? blockOnDurableSend : blockOnNonDurableSend;
          final boolean forceAsyncOverride = handler != null;
          final boolean sendBlocking = sendBlockingConfig && !forceAsyncOverride;
 
          session.workDone();
 
          if (isLarge) {
-            largeMessageSend(sendBlocking, msgI, theCredits, handler);
+            largeMessageSend(sendBlocking, msg, theCredits, handler);
          } else {
-            sendRegularMessage(sendingAddress, msgI, sendBlocking, theCredits, handler);
+            sendRegularMessage(sendingAddress, msg, sendBlocking, theCredits, handler);
          }
       } finally {
          session.endCall();
       }
    }
 
+   private InputStream getBodyInputStream(ICoreMessage msgI) {
+      return msgI.getBodyInputStream();
+   }
+
    private void sendRegularMessage(final SimpleString sendingAddress,
-                                   final MessageInternal msgI,
+                                   final ICoreMessage msgI,
                                    final boolean sendBlocking,
                                    final ClientProducerCredits theCredits,
                                    final SendAcknowledgementHandler handler) throws ActiveMQException {
@@ -301,7 +306,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
     * @throws ActiveMQException
     */
    private void largeMessageSend(final boolean sendBlocking,
-                                 final MessageInternal msgI,
+                                 final ICoreMessage msgI,
                                  final ClientProducerCredits credits,
                                  SendAcknowledgementHandler handler) throws ActiveMQException {
       logger.tracef("largeMessageSend::%s, Blocking=%s", msgI, sendBlocking);
@@ -313,22 +318,22 @@ public class ClientProducerImpl implements ClientProducerInternal {
       }
 
       // msg.getBody() could be Null on LargeServerMessage
-      if (msgI.getBodyInputStream() == null && msgI.getWholeBuffer() != null) {
-         msgI.getWholeBuffer().readerIndex(0);
+      if (getBodyInputStream(msgI) == null && msgI.getBuffer() != null) {
+         msgI.getBuffer().readerIndex(0);
       }
 
       InputStream input;
 
       if (msgI.isServerMessage()) {
          largeMessageSendServer(sendBlocking, msgI, credits, handler);
-      } else if ((input = msgI.getBodyInputStream()) != null) {
+      } else if ((input = getBodyInputStream(msgI)) != null) {
          largeMessageSendStreamed(sendBlocking, msgI, input, credits, handler);
       } else {
          largeMessageSendBuffered(sendBlocking, msgI, credits, handler);
       }
    }
 
-   private void sendInitialLargeMessageHeader(MessageInternal msgI,
+   private void sendInitialLargeMessageHeader(Message msgI,
                                               ClientProducerCredits credits) throws ActiveMQException {
       int creditsUsed = sessionContext.sendInitialChunkOnLargeMessage(msgI);
 
@@ -348,17 +353,14 @@ public class ClientProducerImpl implements ClientProducerInternal {
     * @throws ActiveMQException
     */
    private void largeMessageSendServer(final boolean sendBlocking,
-                                       final MessageInternal msgI,
+                                       final ICoreMessage msgI,
                                        final ClientProducerCredits credits,
                                        SendAcknowledgementHandler handler) throws ActiveMQException {
       sendInitialLargeMessageHeader(msgI, credits);
 
-      BodyEncoder context = msgI.getBodyEncoder();
+      LargeBodyEncoder context = msgI.getBodyEncoder();
 
       final long bodySize = context.getLargeBodySize();
-
-      final int reconnectID = sessionContext.getReconnectID();
-
       context.open();
       try {
 
@@ -392,7 +394,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
     * @throws ActiveMQException
     */
    private void largeMessageSendBuffered(final boolean sendBlocking,
-                                         final MessageInternal msgI,
+                                         final ICoreMessage msgI,
                                          final ClientProducerCredits credits,
                                          SendAcknowledgementHandler handler) throws ActiveMQException {
       msgI.getBodyBuffer().readerIndex(0);
@@ -407,7 +409,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
     * @throws ActiveMQException
     */
    private void largeMessageSendStreamed(final boolean sendBlocking,
-                                         final MessageInternal msgI,
+                                         final ICoreMessage msgI,
                                          final InputStream inputStreamParameter,
                                          final ClientProducerCredits credits,
                                          SendAcknowledgementHandler handler) throws ActiveMQException {
@@ -478,7 +480,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
                msgI.putLongProperty(Message.HDR_LARGE_BODY_SIZE, deflaterReader.getTotalSize());
 
                msgI.getBodyBuffer().writeBytes(buff, 0, pos);
-               sendRegularMessage(msgI.getAddress(), msgI, sendBlocking, credits, handler);
+               sendRegularMessage(msgI.getAddressSimpleString(), msgI, sendBlocking, credits, handler);
                return;
             } else {
                if (!headerSent) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
index 55f9129..ce652d2 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
@@ -513,6 +513,12 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll
    }
 
    @Override
+   public void writeBytes(ByteBuf src, int srcIndex, int length) {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+
+   @Override
    public ByteBuffer toByteBuffer() {
       throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
index 951aea2..0bb5690 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
@@ -863,6 +863,21 @@ public class LargeMessageControllerImpl implements LargeMessageController {
       throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
    }
 
+   /**
+    * Transfers the specified source buffer's data to this buffer starting at
+    * the current {@code writerIndex} until the source buffer's position
+    * reaches its limit, and increases the {@code writerIndex} by the
+    * number of the transferred bytes.
+    *
+    * @param src The source buffer
+    * @throws IndexOutOfBoundsException if {@code src.remaining()} is greater than
+    *                                   {@code this.writableBytes}
+    */
+   @Override
+   public void writeBytes(ByteBuf src, int srcIndex, int length) {
+      throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
+   }
+
    public int writeBytes(final InputStream in, final int length) throws IOException {
       throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/BodyEncoder.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/BodyEncoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/BodyEncoder.java
deleted file mode 100644
index baafaac..0000000
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/BodyEncoder.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.message;
-
-import java.nio.ByteBuffer;
-
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-
-/**
- * Class used to encode message body into buffers.
- * <br>
- * Used to send large streams over the wire
- */
-public interface BodyEncoder {
-
-   /**
-    * This method must not be called directly by ActiveMQ Artemis clients.
-    */
-   void open() throws ActiveMQException;
-
-   /**
-    * This method must not be called directly by ActiveMQ Artemis clients.
-    */
-   void close() throws ActiveMQException;
-
-   /**
-    * This method must not be called directly by ActiveMQ Artemis clients.
-    */
-   int encode(ByteBuffer bufferRead) throws ActiveMQException;
-
-   /**
-    * This method must not be called directly by ActiveMQ Artemis clients.
-    */
-   int encode(ActiveMQBuffer bufferOut, int size) throws ActiveMQException;
-
-   /**
-    * This method must not be called directly by ActiveMQ Artemis clients.
-    */
-   long getLargeBodySize();
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyEncoder.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyEncoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyEncoder.java
new file mode 100644
index 0000000..8b96282
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyEncoder.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.message;
+
+import java.nio.ByteBuffer;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+
+/**
+ * Class used to encode message body into buffers.
+ * <br>
+ * Used to send large streams over the wire
+ */
+public interface LargeBodyEncoder {
+
+   /**
+    * This method must not be called directly by ActiveMQ Artemis clients.
+    */
+   void open() throws ActiveMQException;
+
+   /**
+    * This method must not be called directly by ActiveMQ Artemis clients.
+    */
+   void close() throws ActiveMQException;
+
+   /**
+    * This method must not be called directly by ActiveMQ Artemis clients.
+    */
+   int encode(ByteBuffer bufferRead) throws ActiveMQException;
+
+   /**
+    * This method must not be called directly by ActiveMQ Artemis clients.
+    */
+   int encode(ActiveMQBuffer bufferOut, int size) throws ActiveMQException;
+
+   /**
+    * This method must not be called directly by ActiveMQ Artemis clients.
+    */
+   long getLargeBodySize();
+}