You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/03/03 01:05:20 UTC

[27/36] activemq-artemis git commit: Fixing converters part I

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 1b7ed43..b5d2c86 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
@@ -34,7 +35,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.BindingQueryResult;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
@@ -234,7 +234,8 @@ public class AMQSession implements SessionCallback {
                           ServerConsumer consumer,
                           int deliveryCount) {
       AMQConsumer theConsumer = (AMQConsumer) consumer.getProtocolData();
-      return theConsumer.handleDeliver(reference, message, deliveryCount);
+      // TODO: use encoders and proper conversions here
+      return theConsumer.handleDeliver(reference, message.toCore(), deliveryCount);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index c64c1ea..d0dff4d 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -30,7 +30,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
-import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
@@ -598,7 +598,7 @@ public final class StompConnection implements RemotingConnection {
       }
    }
 
-   protected void sendServerMessage(Message message, String txID) throws ActiveMQStompException {
+   protected void sendServerMessage(ICoreMessage message, String txID) throws ActiveMQStompException {
       StompSession stompSession = getSession(txID);
 
       if (stompSession.isNoLocal()) {
@@ -726,7 +726,7 @@ public final class StompConnection implements RemotingConnection {
       return SERVER_NAME;
    }
 
-   public StompFrame createStompMessage(Message serverMessage,
+   public StompFrame createStompMessage(ICoreMessage serverMessage,
                                         StompSubscription subscription,
                                         int deliveryCount) throws Exception {
       return frameHandler.createMessageFrame(serverMessage, subscription, deliveryCount);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
index 2be0be4..39d2fe9 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
@@ -41,7 +41,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManager;
 import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
-import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
@@ -109,13 +108,6 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
    }
 
    @Override
-   public MessageConverter getConverter() {
-      return null;
-   }
-
-   // ProtocolManager implementation --------------------------------
-
-   @Override
    public ConnectionEntry createConnectionEntry(final Acceptor acceptorUsed, final Connection connection) {
       StompConnection conn = new StompConnection(acceptorUsed, connection, this, server.getScheduledPool(), server.getExecutorFactory());
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index d2d42b7..ba706e5 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -26,6 +26,7 @@ import java.util.concurrent.LinkedBlockingDeque;
 import java.util.zip.Inflater;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.RoutingType;
@@ -131,12 +132,11 @@ public class StompSession implements SessionCallback {
 
       //TODO-now: fix encoders
       LargeServerMessageImpl largeMessage = null;
-      Message newServerMessage = serverMessage;
+      ICoreMessage newServerMessage = serverMessage.toCore();
       try {
          StompSubscription subscription = subscriptions.get(consumer.getID());
          StompFrame frame = null;
          if (serverMessage.isLargeMessage()) {
-            newServerMessage = serverMessage.copy();
 
             largeMessage = (LargeServerMessageImpl) serverMessage;
             LargeBodyEncoder encoder = largeMessage.getBodyEncoder();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
index 8d13613..1e40d42 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
@@ -20,6 +20,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -287,7 +288,7 @@ public abstract class VersionedStompFrameHandler {
       return response;
    }
 
-   public StompFrame createMessageFrame(Message serverMessage,
+   public StompFrame createMessageFrame(ICoreMessage serverMessage,
                                         StompSubscription subscription,
                                         int deliveryCount) throws Exception {
       StompFrame frame = createStompFrame(Stomp.Responses.MESSAGE);
@@ -299,7 +300,7 @@ public abstract class VersionedStompFrameHandler {
       // TODO-now fix encoders
       ActiveMQBuffer buffer = serverMessage.getReadOnlyBodyBuffer();
 
-      int bodyPos = ((CoreMessage)serverMessage).getEndOfBodyPosition() == -1 ? buffer.writerIndex() : ((CoreMessage)serverMessage).getEndOfBodyPosition();
+      int bodyPos = (serverMessage).getEndOfBodyPosition() == -1 ? buffer.writerIndex() : (serverMessage).getEndOfBodyPosition();
 
       buffer.readerIndex(CoreMessage.BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
index b14605d..58d18ef 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
@@ -18,7 +18,7 @@ package org.apache.activemq.artemis.core.protocol.stomp.v12;
 
 import java.util.concurrent.ScheduledExecutorService;
 
-import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompException;
 import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
 import org.apache.activemq.artemis.core.protocol.stomp.StompConnection;
@@ -48,7 +48,7 @@ public class StompFrameHandlerV12 extends StompFrameHandlerV11 {
    }
 
    @Override
-   public StompFrame createMessageFrame(Message serverMessage,
+   public StompFrame createMessageFrame(ICoreMessage serverMessage,
                                         StompSubscription subscription,
                                         int deliveryCount) throws Exception {
       StompFrame frame = super.createMessageFrame(serverMessage, subscription, deliveryCount);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
index 9f36b7f..0ecbae1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
@@ -32,6 +32,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.MessageReference;
@@ -48,8 +49,10 @@ public final class OpenTypeSupport {
    public static CompositeData convert(MessageReference ref) throws OpenDataException {
       CompositeType ct;
 
+      ICoreMessage message = ref.getMessage().toCore();
+
       Map<String, Object> fields;
-      byte type = ref.getMessage().getType();
+      byte type = message.getType();
 
       switch(type) {
          case Message.TEXT_TYPE:
@@ -128,8 +131,7 @@ public final class OpenTypeSupport {
 
       public Map<String, Object> getFields(MessageReference ref) throws OpenDataException {
          Map<String, Object> rc = new HashMap<>();
-         // TODO-now: fix this
-         Message m = ref.getMessage();
+         ICoreMessage m = ref.getMessage().toCore();
          rc.put(CompositeDataConstants.MESSAGE_ID, "" + m.getMessageID());
          if (m.getUserID() != null) {
             rc.put(CompositeDataConstants.USER_ID, "ID:" + m.getUserID().toString());
@@ -270,7 +272,7 @@ public final class OpenTypeSupport {
       @Override
       public Map<String, Object> getFields(MessageReference ref) throws OpenDataException {
          Map<String, Object> rc = super.getFields(ref);
-         Message m = ref.getMessage();
+         ICoreMessage m = ref.getMessage().toCore();
          ActiveMQBuffer bodyCopy = m.getReadOnlyBodyBuffer();
          byte[] bytes = new byte[bodyCopy.readableBytes()];
          bodyCopy.readBytes(bytes);
@@ -291,7 +293,7 @@ public final class OpenTypeSupport {
       @Override
       public Map<String, Object> getFields(MessageReference ref) throws OpenDataException {
          Map<String, Object> rc = super.getFields(ref);
-         Message m = ref.getMessage();
+         ICoreMessage m = ref.getMessage().toCore();
          SimpleString text = m.getReadOnlyBodyBuffer().readNullableSimpleString();
          rc.put(CompositeDataConstants.TEXT_BODY, text != null ? text.toString() : "");
          return rc;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
index aabec54..271b85c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
@@ -255,7 +256,7 @@ public final class Page implements Comparable<Page> {
 
       if (messages != null) {
          for (PagedMessage msg : messages) {
-            if (msg.getMessage().isLargeMessage()) {
+            if (msg.getMessage() instanceof ICoreMessage &&  ((ICoreMessage)msg.getMessage()).isLargeMessage()) {
                LargeServerMessage lmsg = (LargeServerMessage) msg.getMessage();
 
                // Remember, cannot call delete directly here

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
index d50dd2e..7d43a2e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.paging.impl;
 import java.util.Arrays;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -136,8 +137,8 @@ public class PagedMessageImpl implements PagedMessage {
       }
    }
 
-   private boolean isLargeMessage() {
-      return message.isLargeMessage();
+   public boolean isLargeMessage() {
+      return message instanceof ICoreMessage && ((ICoreMessage)message).isLargeMessage();
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index 2295987..fc0885f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
 import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
 import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.exception.ActiveMQXAException;
 import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -85,7 +86,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.BindingQueryResult;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.jboss.logging.Logger;
@@ -491,7 +491,6 @@ public class ServerSessionPacketHandler implements ChannelHandler {
                case SESS_SEND: {
                   SessionSendMessage message = (SessionSendMessage) packet;
                   requiresResponse = message.isRequiresResponse();
-                  message.getMessage().setProtocol(manager);
                   session.send(message.getMessage(), direct);
                   if (requiresResponse) {
                      response = new NullResponseMessage();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
index ffaf2cb..cc81fbe 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
@@ -30,6 +30,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.BaseInterceptor;
 import org.apache.activemq.artemis.api.core.Interceptor;
 import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
@@ -53,9 +54,7 @@ import org.apache.activemq.artemis.core.remoting.CloseListener;
 import org.apache.activemq.artemis.core.remoting.impl.netty.ActiveMQFrameDecoder2;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
-import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -111,16 +110,6 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor> {
       return false;
    }
 
-   /**
-    * no need to implement this now
-    *
-    * @return
-    */
-   @Override
-   public MessageConverter getConverter() {
-      return null;
-   }
-
    @Override
    public ConnectionEntry createConnectionEntry(final Acceptor acceptorUsed, final Connection connection) {
       final Configuration config = server.getConfiguration();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
index 3a09e91..542d726 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
@@ -92,10 +92,9 @@ public final class CoreSessionCallback implements SessionCallback {
    }
 
    @Override
-   public int sendMessage(MessageReference ref, Message message, ServerConsumer consumer, int deliveryCount) {
+   public int sendMessage(MessageReference ref, Message message, ServerConsumer consumer, int deliveryCount)  {
 
-      // TODO-now: fix this
-      Packet packet = new SessionReceiveMessage(consumer.getID(), message, deliveryCount);
+      Packet packet = new SessionReceiveMessage(consumer.getID(), message.toCore(), deliveryCount);
 
       int size = 0;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
index aa58a7d..6fcc802 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
@@ -17,11 +17,11 @@
 package org.apache.activemq.artemis.core.server;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.replication.ReplicatedLargeMessage;
 
-public interface LargeServerMessage extends ReplicatedLargeMessage, Message {
+public interface LargeServerMessage extends ReplicatedLargeMessage, ICoreMessage {
 
    @Override
    void addBytes(byte[] bytes) throws Exception;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
index ce9c489..ab97b56 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.server;
 
 import java.util.List;
 
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 
 /**
@@ -94,7 +95,7 @@ public interface ServerConsumer extends Consumer {
 
    void individualCancel(final long messageID, boolean failed) throws Exception;
 
-   void forceDelivery(long sequence);
+   void forceDelivery(long sequence) throws ActiveMQException;
 
    void setTransferring(boolean transferring);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index a130437..e3c4744 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -507,14 +507,19 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
     * there are no other messages to be delivered.
     */
    @Override
-   public void forceDelivery(final long sequence) {
+   public void forceDelivery(final long sequence) throws ActiveMQException {
       forceDelivery(sequence, () -> {
          Message forcedDeliveryMessage = new CoreMessage(storageManager.generateID(), 50);
 
          forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
          forcedDeliveryMessage.setAddress(messageQueue.getName());
 
-         callback.sendMessage(null, forcedDeliveryMessage, ServerConsumerImpl.this, 0);
+         try {
+            callback.sendMessage(null, forcedDeliveryMessage, ServerConsumerImpl.this, 0);
+         } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
+         }
+
       });
    }
 
@@ -1015,7 +1020,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
     * @param ref
     * @param message
     */
-   private void deliverStandardMessage(final MessageReference ref, final Message message) {
+   private void deliverStandardMessage(final MessageReference ref, final Message message) throws ActiveMQException {
       int packetSize = callback.sendMessage(ref, message, ServerConsumerImpl.this, ref.getDeliveryCount());
 
       if (availableCredits != null) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java
index 84ab636..29a2e47 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java
@@ -21,7 +21,9 @@ import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
@@ -42,8 +44,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.Divert;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueFactory;
-import org.apache.activemq.artemis.api.core.RoutingType;
-
 import org.apache.activemq.artemis.core.server.cluster.Bridge;
 import org.apache.activemq.artemis.core.server.cluster.BroadcastGroup;
 import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
@@ -129,5 +129,5 @@ public interface ManagementService extends NotificationService, ActiveMQComponen
 
    Object[] getResources(Class<?> resourceType);
 
-   Message handleMessage(Message message) throws Exception;
+   ICoreMessage handleMessage(Message message) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
index 5b2bf28..cda0a8a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
@@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.JsonUtil;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.RoutingType;
@@ -365,10 +366,10 @@ public class ManagementServiceImpl implements ManagementService {
    }
 
    @Override
-   public Message handleMessage(Message message) throws Exception {
+   public ICoreMessage handleMessage(Message message) throws Exception {
       message = message.toCore();
       // a reply message is sent with the result stored in the message body.
-      Message reply = new CoreMessage(storageManager.generateID(), 512);
+      CoreMessage reply = new CoreMessage(storageManager.generateID(), 512);
 
       String resourceName = message.getStringProperty(ManagementHelper.HDR_RESOURCE_NAME);
       if (logger.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/CoreTransactionDetail.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/CoreTransactionDetail.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/CoreTransactionDetail.java
index c885341..95036da 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/CoreTransactionDetail.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/CoreTransactionDetail.java
@@ -19,8 +19,8 @@ package org.apache.activemq.artemis.core.transaction.impl;
 import javax.transaction.xa.Xid;
 import java.util.Map;
 
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
-
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.core.transaction.TransactionDetail;
 
@@ -32,7 +32,10 @@ public class CoreTransactionDetail extends TransactionDetail {
 
    @Override
    public String decodeMessageType(Message msg) {
-      int type = msg.getType();
+      if (!(msg instanceof ICoreMessage)) {
+         return "N/A";
+      }
+      int type = ((ICoreMessage)msg).getType();
       switch (type) {
          case Message.DEFAULT_TYPE: // 0
             return "Default";

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java
index 3a5e2bf..a440e31 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java
@@ -16,11 +16,12 @@
  */
 package org.apache.activemq.artemis.spi.core.protocol;
 
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 
-public interface MessageConverter {
+public interface MessageConverter<ProtocolMessage extends Message> {
 
-   Message inbound(Object messageInbound) throws Exception;
+   ICoreMessage toCore(ProtocolMessage pureMessage) throws Exception;
 
-   Object outbound(Message messageOutbound, int deliveryCount) throws Exception;
+   ProtocolMessage fromCore(ICoreMessage coreMessage) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java
index c2b7334..e29d74d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java
@@ -22,9 +22,9 @@ import java.util.Map;
 import io.netty.channel.ChannelPipeline;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.BaseInterceptor;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 
@@ -53,14 +53,6 @@ public interface ProtocolManager<P extends BaseInterceptor> {
    boolean isProtocol(byte[] array);
 
    /**
-    * Gets the Message Converter towards ActiveMQ Artemis.
-    * Notice this being null means no need to convert
-    *
-    * @return
-    */
-   MessageConverter getConverter();
-
-   /**
     * If this protocols accepts connectoins without an initial handshake.
     * If true this protocol will be the failback case no other connections are made.
     * New designed protocols should always require a handshake. This is only useful for legacy protocols.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java
index 92204ce..2f18c21 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java
@@ -23,7 +23,9 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.management.ManagementHelper;
@@ -44,8 +46,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.Divert;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueFactory;
-import org.apache.activemq.artemis.api.core.RoutingType;
-
 import org.apache.activemq.artemis.core.server.cluster.Bridge;
 import org.apache.activemq.artemis.core.server.cluster.BroadcastGroup;
 import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
@@ -330,7 +330,7 @@ public class ClusteredResetMockTest extends ActiveMQTestBase {
       }
 
       @Override
-      public Message handleMessage(Message message) throws Exception {
+      public ICoreMessage handleMessage(Message message) throws Exception {
          return null;
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 5b44572..522b7d0 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -32,15 +32,13 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.RefCountMessage;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.api.core.encode.BodyType;
 import org.apache.activemq.artemis.core.filter.Filter;
-import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
 import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.server.Consumer;
@@ -312,12 +310,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       final long id;
 
       @Override
-      public Message toCore() {
-         return this;
-      }
-
-      @Override
-      public ActiveMQBuffer getReadOnlyBodyBuffer() {
+      public CoreMessage toCore() {
          return null;
       }
 
@@ -389,10 +382,6 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       public void messageChanged() {
 
       }
-      @Override
-      public LargeBodyEncoder getBodyEncoder() throws ActiveMQException {
-         return null;
-      }
 
       @Override
       public UUID getUserID() {
@@ -418,32 +407,6 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       public ByteBuf getBuffer() {
          return null;
       }
-
-      @Override
-      public Object getProtocol() {
-         return null;
-      }
-
-      @Override
-      public Message setProtocol(Object protocol) {
-         return null;
-      }
-
-      @Override
-      public Object getBody() {
-         return null;
-      }
-
-      @Override
-      public BodyType getBodyType() {
-         return null;
-      }
-
-      @Override
-      public Message setBody(BodyType type, Object body) {
-         return null;
-      }
-
       @Override
       public Message setAddress(String address) {
          return null;
@@ -455,11 +418,6 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
-      public byte getType() {
-         return 0;
-      }
-
-      @Override
       public boolean isDurable() {
          return false;
       }
@@ -515,11 +473,6 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
-      public ActiveMQBuffer getBodyBuffer() {
-         return null;
-      }
-
-      @Override
       public Message putBooleanProperty(SimpleString key, boolean value) {
          return null;
       }
@@ -785,11 +738,6 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
-      public Message setType(byte type) {
-         return null;
-      }
-
-      @Override
       public void receiveBuffer(ByteBuf buffer) {
 
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index aa64d9f..0bb177d 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -65,6 +65,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -2079,7 +2080,7 @@ public abstract class ActiveMQTestBase extends Assert {
    }
 
    protected Message generateMessage(final long id) {
-      Message message = new CoreMessage(id, 1000);
+      ICoreMessage message = new CoreMessage(id, 1000);
 
       message.setMessageID(id);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
index 40f2ebd..16154c1 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
@@ -25,6 +25,7 @@ import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.RefCountMessage;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -35,9 +36,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.client.MessageHandler;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
-import org.apache.activemq.artemis.api.core.encode.BodyType;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
-import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
 import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQConsumerContext;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -339,6 +338,8 @@ public class AcknowledgeTest extends ActiveMQTestBase {
 
    class FakeMessageWithID extends RefCountMessage {
 
+      final long id;
+
       @Override
       public int getPersistSize() {
          return 0;
@@ -354,11 +355,6 @@ public class AcknowledgeTest extends ActiveMQTestBase {
       }
 
       @Override
-      public Message setProtocol(Object protocol) {
-         return this;
-      }
-
-      @Override
       public void reloadPersistence(ActiveMQBuffer record) {
 
       }
@@ -369,8 +365,8 @@ public class AcknowledgeTest extends ActiveMQTestBase {
       }
 
       @Override
-      public Message toCore() {
-         return this;
+      public ICoreMessage toCore() {
+         return null;
       }
 
       @Override
@@ -382,12 +378,6 @@ public class AcknowledgeTest extends ActiveMQTestBase {
       public void sendBuffer(ByteBuf buffer, int count) {
 
       }
-
-      @Override
-      public LargeBodyEncoder getBodyEncoder() throws ActiveMQException {
-         return null;
-      }
-
       @Override
       public Message setUserID(Object userID) {
          return null;
@@ -404,18 +394,6 @@ public class AcknowledgeTest extends ActiveMQTestBase {
       }
 
       @Override
-      public ActiveMQBuffer getReadOnlyBodyBuffer() {
-         return null;
-      }
-
-      final long id;
-
-      @Override
-      public Message setType(byte type) {
-         return null;
-      }
-
-      @Override
       public Message copy() {
          return null;
       }
@@ -495,26 +473,6 @@ public class AcknowledgeTest extends ActiveMQTestBase {
       }
 
       @Override
-      public Object getProtocol() {
-         return null;
-      }
-
-      @Override
-      public Object getBody() {
-         return null;
-      }
-
-      @Override
-      public BodyType getBodyType() {
-         return null;
-      }
-
-      @Override
-      public Message setBody(BodyType type, Object body) {
-         return null;
-      }
-
-      @Override
       public Message setAddress(String address) {
          return null;
       }
@@ -525,11 +483,6 @@ public class AcknowledgeTest extends ActiveMQTestBase {
       }
 
       @Override
-      public byte getType() {
-         return 0;
-      }
-
-      @Override
       public boolean isDurable() {
          return false;
       }
@@ -585,11 +538,6 @@ public class AcknowledgeTest extends ActiveMQTestBase {
       }
 
       @Override
-      public ActiveMQBuffer getBodyBuffer() {
-         return null;
-      }
-
-      @Override
       public Message putBooleanProperty(SimpleString key, boolean value) {
          return null;
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
index e2cf2a0..03e9ec3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
@@ -196,7 +196,7 @@ public class ConsumerTest extends ActiveMQTestBase {
          return;
       }
 
-      internalSend(true);
+      internalSend(true, true);
    }
 
    @Test
@@ -207,21 +207,38 @@ public class ConsumerTest extends ActiveMQTestBase {
          return;
       }
 
-      internalSend(false);
+      internalSend(false, true);
    }
 
-   public void internalSend(boolean amqp) throws Throwable {
+   @Test
+   public void testSendAMQPReceiveCore() throws Throwable {
+
+      if (!isNetty()) {
+         // no need to run the test, there's no AMQP support
+         return;
+      }
+
+      internalSend(true, false);
+   }
 
-      ConnectionFactory factory;
+   @Test
+   public void testSendCoreReceiveAMQP() throws Throwable {
 
-      if (amqp) {
-         factory = new JmsConnectionFactory("amqp://localhost:61616");
-      } else {
-         factory = new ActiveMQConnectionFactory();
+      if (!isNetty()) {
+         // no need to run the test, there's no AMQP support
+         return;
       }
 
+      internalSend(false, true);
+   }
+
+   public void internalSend(boolean amqpSender, boolean amqpConsumer) throws Throwable {
+
+      ConnectionFactory factoryAMQP = new JmsConnectionFactory("amqp://localhost:61616");
+      ConnectionFactory factoryCore = new ActiveMQConnectionFactory();
+
 
-      Connection connection = factory.createConnection();
+      Connection connection = (amqpSender ? factoryAMQP : factoryCore).createConnection();
 
       try {
          Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -232,7 +249,9 @@ public class ConsumerTest extends ActiveMQTestBase {
          long time = System.currentTimeMillis();
          int NUMBER_OF_MESSAGES = 100;
          for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
-            producer.send(session.createTextMessage("hello " + i));
+            TextMessage msg = session.createTextMessage("hello " + i);
+            msg.setIntProperty("mycount", i);
+            producer.send(msg);
          }
          long end = System.currentTimeMillis();
 
@@ -245,8 +264,9 @@ public class ConsumerTest extends ActiveMQTestBase {
             server.start();
          }
 
-         connection = factory.createConnection();
+         connection = (amqpConsumer ? factoryAMQP : factoryCore).createConnection();
          session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         queue = session.createQueue(QUEUE.toString());
 
          connection.start();
 
@@ -255,6 +275,7 @@ public class ConsumerTest extends ActiveMQTestBase {
          for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
             TextMessage message = (TextMessage) consumer.receive(1000);
             Assert.assertNotNull(message);
+            Assert.assertEquals(i, message.getIntProperty("mycount"));
             Assert.assertEquals("hello " + i, message.getText());
          }
       } finally {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
index 5e822eb..025d00a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
@@ -40,7 +40,6 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.config.StoreConfiguration;
-
 import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
 import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -350,7 +349,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
 
       ClientProducer producer = session.createProducer(ADDRESS);
 
-      Message clientFile = session.createMessage(true);
+      ClientMessage clientFile = session.createMessage(true);
       for (int i = 0; i < messageSize; i++) {
          clientFile.getBodyBuffer().writeByte(getSamplebyte(i));
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/clientcrash/ClientExitTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/clientcrash/ClientExitTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/clientcrash/ClientExitTest.java
index 70c5b22..23fa0a6 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/clientcrash/ClientExitTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/clientcrash/ClientExitTest.java
@@ -16,9 +16,9 @@
  */
 package org.apache.activemq.artemis.tests.integration.clientcrash;
 
-import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
@@ -63,7 +63,7 @@ public class ClientExitTest extends ClientTestBase {
 
       // read the message from the queue
 
-      Message message = consumer.receive(15000);
+      ClientMessage message = consumer.receive(15000);
 
       assertNotNull(message);
       assertEquals(ClientExitTest.MESSAGE_TEXT, message.getBodyBuffer().readString());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MessageJournalTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MessageJournalTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MessageJournalTest.java
index 1897bdd..85ed04f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MessageJournalTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MessageJournalTest.java
@@ -28,7 +28,6 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageM
 import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFactory;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
-import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
 import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.qpid.proton.message.Message;
@@ -53,8 +52,6 @@ public class MessageJournalTest extends ActiveMQTestBase {
 
       message.getBodyBuffer().writeByte((byte)'Z');
 
-      message.setProtocol(factory.createProtocolManager(server, null, null, null));
-
       server.getStorageManager().storeMessage(message);
 
       server.getStorageManager().stop();
@@ -95,7 +92,7 @@ public class MessageJournalTest extends ActiveMQTestBase {
 
       Message protonJMessage = Message.Factory.create();
 
-      AMQPMessage message = new AMQPMessage(protonJMessage, (ProtonProtocolManager)factory.createProtocolManager(server, null, null, null));
+      AMQPMessage message = new AMQPMessage(protonJMessage);
 
       message.setMessageID(333);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementHelperTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementHelperTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementHelperTest.java
index 0719b38..3151408 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementHelperTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementHelperTest.java
@@ -20,7 +20,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.management.ManagementHelper;
 import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl;
 import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
@@ -48,7 +48,7 @@ public class ManagementHelperTest extends Assert {
       String operationName = RandomUtil.randomString();
       String param = RandomUtil.randomString();
       String[] params = new String[]{RandomUtil.randomString(), RandomUtil.randomString(), RandomUtil.randomString()};
-      Message msg = new ClientMessageImpl((byte) 0, false, 0, 0, (byte) 4, 1000);
+      ClientMessage msg = new ClientMessageImpl((byte) 0, false, 0, 0, (byte) 4, 1000);
       ManagementHelper.putOperationInvocation(msg, resource, operationName, param, params);
 
       Object[] parameters = ManagementHelper.retrieveOperationParameters(msg);
@@ -135,7 +135,7 @@ public class ManagementHelperTest extends Assert {
 
       Object[] params = new Object[]{i, s, d, b, l, map, strArray, maps};
 
-      Message msg = new ClientMessageImpl((byte) 0, false, 0, 0, (byte) 4, 1000);
+      ClientMessageImpl msg = new ClientMessageImpl((byte) 0, false, 0, 0, (byte) 4, 1000);
       ManagementHelper.putOperationInvocation(msg, resource, operationName, params);
 
       Object[] parameters = ManagementHelper.retrieveOperationParameters(msg);
@@ -201,7 +201,7 @@ public class ManagementHelperTest extends Assert {
 
       Object[] params = new Object[]{"hello", map};
 
-      Message msg = new ClientMessageImpl((byte) 0, false, 0, 0, (byte) 4, 1000);
+      ClientMessageImpl msg = new ClientMessageImpl((byte) 0, false, 0, 0, (byte) 4, 1000);
       ManagementHelper.putOperationInvocation(msg, resource, operationName, params);
 
       Object[] parameters = ManagementHelper.retrieveOperationParameters(msg);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementServiceImplTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementServiceImplTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementServiceImplTest.java
index b6ea147..151341f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementServiceImplTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementServiceImplTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.tests.integration.management;
 
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.management.AddressControl;
@@ -50,7 +51,7 @@ public class ManagementServiceImplTest extends ActiveMQTestBase {
       server.start();
 
       // invoke attribute and operation on the server
-      Message message = new CoreMessage(1, 100);
+      CoreMessage message = new CoreMessage(1, 100);
       ManagementHelper.putOperationInvocation(message, ResourceNames.BROKER, "createQueue", queue, address);
 
       Message reply = server.getManagementService().handleMessage(message);
@@ -66,10 +67,10 @@ public class ManagementServiceImplTest extends ActiveMQTestBase {
       server.start();
 
       // invoke attribute and operation on the server
-      Message message = new CoreMessage(1, 100);
+      CoreMessage message = new CoreMessage(1, 100);
       ManagementHelper.putOperationInvocation(message, ResourceNames.BROKER, "thereIsNoSuchOperation");
 
-      Message reply = server.getManagementService().handleMessage(message);
+      ICoreMessage reply = server.getManagementService().handleMessage(message);
 
       Assert.assertFalse(ManagementHelper.hasOperationSucceeded(reply));
       Assert.assertNotNull(ManagementHelper.getResult(reply));
@@ -83,10 +84,10 @@ public class ManagementServiceImplTest extends ActiveMQTestBase {
       server.start();
 
       // invoke attribute and operation on the server
-      Message message = new CoreMessage(1, 100);
+      ICoreMessage message = new CoreMessage(1, 100);
       ManagementHelper.putOperationInvocation(message, "Resouce.Does.Not.Exist", "toString");
 
-      Message reply = server.getManagementService().handleMessage(message);
+      ICoreMessage reply = server.getManagementService().handleMessage(message);
 
       Assert.assertFalse(ManagementHelper.hasOperationSucceeded(reply));
       Assert.assertNotNull(ManagementHelper.getResult(reply));
@@ -100,11 +101,11 @@ public class ManagementServiceImplTest extends ActiveMQTestBase {
       server.start();
 
       // invoke attribute and operation on the server
-      Message message = new CoreMessage(1, 100);
+      ICoreMessage message = new CoreMessage(1, 100);
 
       ManagementHelper.putAttribute(message, ResourceNames.BROKER, "started");
 
-      Message reply = server.getManagementService().handleMessage(message);
+      ICoreMessage reply = server.getManagementService().handleMessage(message);
 
       Assert.assertTrue(ManagementHelper.hasOperationSucceeded(reply));
       Assert.assertTrue((Boolean) ManagementHelper.getResult(reply));
@@ -118,11 +119,11 @@ public class ManagementServiceImplTest extends ActiveMQTestBase {
       server.start();
 
       // invoke attribute and operation on the server
-      Message message = new CoreMessage(1, 100);
+      ICoreMessage message = new CoreMessage(1, 100);
 
       ManagementHelper.putAttribute(message, ResourceNames.BROKER, "attribute.Does.Not.Exist");
 
-      Message reply = server.getManagementService().handleMessage(message);
+      ICoreMessage reply = server.getManagementService().handleMessage(message);
 
       Assert.assertFalse(ManagementHelper.hasOperationSucceeded(reply));
       Assert.assertNotNull(ManagementHelper.getResult(reply));

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java
index 1f0d7e0..3675416 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java
@@ -26,7 +26,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
@@ -289,7 +288,7 @@ public class PagingSendTest extends ActiveMQTestBase {
       List<String> messageIds = new ArrayList<>();
       ClientProducer producer = session.createProducer(queueAddr);
       for (int i = 0; i < batchSize; i++) {
-         Message message = session.createMessage(true);
+         ClientMessage message = session.createMessage(true);
          message.getBodyBuffer().writeBytes(new byte[1024]);
          String id = UUID.randomUUID().toString();
          message.putStringProperty("id", id);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
index 1714947..48127d2 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
@@ -42,6 +42,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
@@ -75,7 +76,6 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContex
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.JournalType;
 import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
@@ -5536,7 +5536,7 @@ public class PagingTest extends ActiveMQTestBase {
 
       for (int i = 0; i < 100; i++) {
          Message msg = session.createMessage(true);
-         msg.getBodyBuffer().writeBytes(new byte[1024]);
+         msg.toCore().getBodyBuffer().writeBytes(new byte[1024]);
          prod.send(msg);
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java
index cba3008..ec49ece 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java
@@ -425,7 +425,7 @@ public class ScaleDownTest extends ClusterTestBase {
 
       while (!servers[0].getPagingManager().getPageStore(new SimpleString(addressName)).isPaging()) {
          for (int i = 0; i < CHUNK_SIZE; i++) {
-            Message message = session.createMessage(true);
+            ClientMessage message = session.createMessage(true);
             message.getBodyBuffer().writeBytes(new byte[1024]);
             producer.send(message);
             messageCount++;
@@ -463,7 +463,7 @@ public class ScaleDownTest extends ClusterTestBase {
 
       while (!servers[0].getPagingManager().getPageStore(new SimpleString(addressName)).isPaging()) {
          for (int i = 0; i < CHUNK_SIZE; i++) {
-            Message message = session.createMessage(true);
+            ClientMessage message = session.createMessage(true);
             message.getBodyBuffer().writeBytes(new byte[1024]);
             message.putIntProperty("order", i);
             producer.send(message);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ssl/CoreClientOverOneWaySSLTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ssl/CoreClientOverOneWaySSLTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ssl/CoreClientOverOneWaySSLTest.java
index 89f7a60..141a6b8 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ssl/CoreClientOverOneWaySSLTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ssl/CoreClientOverOneWaySSLTest.java
@@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
@@ -41,7 +42,6 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.core.remoting.impl.ssl.SSLSupport;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.utils.RandomUtil;
@@ -127,7 +127,7 @@ public class CoreClientOverOneWaySSLTest extends ActiveMQTestBase {
       ClientConsumer consumer = addClientConsumer(session.createConsumer(CoreClientOverOneWaySSLTest.QUEUE));
       session.start();
 
-      Message m = consumer.receive(1000);
+      ClientMessage m = consumer.receive(1000);
       Assert.assertNotNull(m);
       Assert.assertEquals(text, m.getBodyBuffer().readString());
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ssl/CoreClientOverTwoWaySSLTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ssl/CoreClientOverTwoWaySSLTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ssl/CoreClientOverTwoWaySSLTest.java
index 772e44d..11b3b0b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ssl/CoreClientOverTwoWaySSLTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ssl/CoreClientOverTwoWaySSLTest.java
@@ -26,7 +26,6 @@ import io.netty.handler.ssl.SslHandler;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException;
 import org.apache.activemq.artemis.api.core.Interceptor;
-import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
@@ -151,7 +150,7 @@ public class CoreClientOverTwoWaySSLTest extends ActiveMQTestBase {
       ClientConsumer consumer = session.createConsumer(CoreClientOverTwoWaySSLTest.QUEUE);
       session.start();
 
-      Message m = consumer.receive(1000);
+      ClientMessage m = consumer.receive(1000);
       Assert.assertNotNull(m);
       Assert.assertEquals(text, m.getBodyBuffer().readString());
    }
@@ -189,7 +188,7 @@ public class CoreClientOverTwoWaySSLTest extends ActiveMQTestBase {
       ClientConsumer consumer = session.createConsumer(CoreClientOverTwoWaySSLTest.QUEUE);
       session.start();
 
-      Message m = consumer.receive(1000);
+      ClientMessage m = consumer.receive(1000);
       Assert.assertNotNull(m);
       Assert.assertEquals(text, m.getBodyBuffer().readString());
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/message/impl/MessageImplTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/message/impl/MessageImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/message/impl/MessageImplTest.java
index 2e0ffac..e88097a 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/message/impl/MessageImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/message/impl/MessageImplTest.java
@@ -21,6 +21,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl;
@@ -46,9 +47,9 @@ public class MessageImplTest extends ActiveMQTestBase {
          final long expiration = RandomUtil.randomLong();
          final long timestamp = RandomUtil.randomLong();
          final byte priority = RandomUtil.randomByte();
-         Message message1 = new ClientMessageImpl(type, durable, expiration, timestamp, priority, 100);
+         ICoreMessage message1 = new ClientMessageImpl(type, durable, expiration, timestamp, priority, 100);
 
-         Message message = message1;
+         ICoreMessage message = message1;
 
          Assert.assertEquals(type, message.getType());
          Assert.assertEquals(durable, message.isDurable());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java
index 37e33ed..847e8b7 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java
@@ -19,7 +19,7 @@ package org.apache.activemq.artemis.tests.unit.core.paging.impl;
 import java.nio.ByteBuffer;
 import java.util.List;
 
-import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
@@ -207,7 +207,7 @@ public class PageTest extends ActiveMQTestBase {
       int initialNumberOfMessages = page.getNumberOfMessages();
 
       for (int i = 0; i < numberOfElements; i++) {
-         Message msg = new CoreMessage().initBuffer(100);
+         ICoreMessage msg = new CoreMessage().initBuffer(100);
 
          for (int j = 0; j < 10; j++) {
             msg.getBodyBuffer().writeByte((byte) 'b');

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java
index 654fd89..60f7a15 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java
@@ -22,7 +22,7 @@ import java.util.List;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 
-import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
@@ -63,7 +63,7 @@ public class PagingManagerImplTest extends ActiveMQTestBase {
 
       PagingStore store = managerImpl.getPageStore(new SimpleString("simple-test"));
 
-      Message msg = createMessage(1L, new SimpleString("simple-test"), createRandomBuffer(10));
+      ICoreMessage msg = createMessage(1L, new SimpleString("simple-test"), createRandomBuffer(10));
 
       final RoutingContextImpl ctx = new RoutingContextImpl(null);
       Assert.assertFalse(store.page(msg, ctx.getTransaction(), ctx.getContextListing(store.getStoreName()), lock));
@@ -82,7 +82,7 @@ public class PagingManagerImplTest extends ActiveMQTestBase {
 
       Assert.assertEquals(1, msgs.size());
 
-      ActiveMQTestBase.assertEqualsByteArrays(msg.getBodyBuffer().writerIndex(), msg.getBodyBuffer().toByteBuffer().array(), (msgs.get(0).getMessage()).getBodyBuffer().toByteBuffer().array());
+      ActiveMQTestBase.assertEqualsByteArrays(msg.getBodyBuffer().writerIndex(), msg.getBodyBuffer().toByteBuffer().array(), (msgs.get(0).getMessage()).toCore().getBodyBuffer().toByteBuffer().array());
 
       Assert.assertTrue(store.isPaging());
 
@@ -104,10 +104,10 @@ public class PagingManagerImplTest extends ActiveMQTestBase {
       pageDirDir.mkdirs();
    }
 
-   protected Message createMessage(final long messageId,
-                                   final SimpleString destination,
-                                   final ByteBuffer buffer) {
-      Message msg = new CoreMessage(messageId, 200);
+   protected ICoreMessage createMessage(final long messageId,
+                                        final SimpleString destination,
+                                        final ByteBuffer buffer) {
+      ICoreMessage msg = new CoreMessage(messageId, 200);
 
       msg.setAddress(destination);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
index 905e550..af58a53 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
@@ -224,7 +224,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
 
       for (int i = 0; i < numMessages; i++) {
          ActiveMQBuffer horn1 = buffers.get(i);
-         ActiveMQBuffer horn2 = msg.get(i).getMessage().getBodyBuffer();
+         ActiveMQBuffer horn2 = msg.get(i).getMessage().toCore().getBodyBuffer();
          horn1.resetReaderIndex();
          horn2.resetReaderIndex();
          for (int j = 0; j < horn1.writerIndex(); j++) {
@@ -290,7 +290,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
 
          for (int i = 0; i < 5; i++) {
             Assert.assertEquals(sequence++, msg.get(i).getMessage().getMessageID());
-            ActiveMQTestBase.assertEqualsBuffers(18, buffers.get(pageNr * 5 + i), msg.get(i).getMessage().getBodyBuffer());
+            ActiveMQTestBase.assertEqualsBuffers(18, buffers.get(pageNr * 5 + i), msg.get(i).getMessage().toCore().getBodyBuffer());
          }
       }
 
@@ -341,7 +341,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
 
       Assert.assertEquals(1L, msgs.get(0).getMessage().getMessageID());
 
-      ActiveMQTestBase.assertEqualsBuffers(18, buffers.get(0), msgs.get(0).getMessage().getBodyBuffer());
+      ActiveMQTestBase.assertEqualsBuffers(18, buffers.get(0), msgs.get(0).getMessage().toCore().getBodyBuffer());
 
       Assert.assertEquals(1, store.getNumberOfPages());
 
@@ -485,14 +485,14 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
          page.close();
 
          for (PagedMessage msg : msgs) {
-            long id = msg.getMessage().getBodyBuffer().readLong();
-            msg.getMessage().getBodyBuffer().resetReaderIndex();
+            long id = msg.getMessage().toCore().getBodyBuffer().readLong();
+            msg.getMessage().toCore().getBodyBuffer().resetReaderIndex();
 
             Message msgWritten = buffers.remove(id);
             buffers2.put(id, msg.getMessage());
             Assert.assertNotNull(msgWritten);
             Assert.assertEquals(msg.getMessage().getAddress(), msgWritten.getAddressSimpleString());
-            ActiveMQTestBase.assertEqualsBuffers(10, msgWritten.getBodyBuffer(), msg.getMessage().getBodyBuffer());
+            ActiveMQTestBase.assertEqualsBuffers(10, msgWritten.toCore().getBodyBuffer(), msg.getMessage().toCore().getBodyBuffer());
          }
       }
 
@@ -547,11 +547,11 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
 
          for (PagedMessage msg : msgs) {
 
-            long id = msg.getMessage().getBodyBuffer().readLong();
+            long id = msg.getMessage().toCore().getBodyBuffer().readLong();
             Message msgWritten = buffers2.remove(id);
             Assert.assertNotNull(msgWritten);
             Assert.assertEquals(msg.getMessage().getAddress(), msgWritten.getAddressSimpleString());
-            ActiveMQTestBase.assertEqualsByteArrays(msgWritten.getBodyBuffer().writerIndex(), msgWritten.getBodyBuffer().toByteBuffer().array(), msg.getMessage().getBodyBuffer().toByteBuffer().array());
+            ActiveMQTestBase.assertEqualsByteArrays(msgWritten.toCore().getBodyBuffer().writerIndex(), msgWritten.toCore().getBodyBuffer().toByteBuffer().array(), msg.getMessage().toCore().getBodyBuffer().toByteBuffer().array());
          }
       }
 
@@ -560,8 +560,8 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
       lastPage.close();
       Assert.assertEquals(1, lastMessages.size());
 
-      lastMessages.get(0).getMessage().getBodyBuffer().resetReaderIndex();
-      Assert.assertEquals(lastMessages.get(0).getMessage().getBodyBuffer().readLong(), lastMessageId);
+      lastMessages.get(0).getMessage().toCore().getBodyBuffer().resetReaderIndex();
+      Assert.assertEquals(lastMessages.get(0).getMessage().toCore().getBodyBuffer().readLong(), lastMessageId);
 
       Assert.assertEquals(0, buffers2.size());
 
@@ -739,11 +739,11 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
       };
    }
 
-   private Message createMessage(final long id,
+   private CoreMessage createMessage(final long id,
                                        final PagingStore store,
                                        final SimpleString destination,
                                        final ActiveMQBuffer buffer) {
-      Message msg = new CoreMessage(id, 50 + buffer.capacity());
+      CoreMessage msg = new CoreMessage(id, 50 + buffer.capacity());
 
       msg.setAddress(destination);