You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/03/03 01:05:05 UTC

[12/36] activemq-artemis git commit: ARTEMIS-1009 Pure Message Encoding.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 2943f15..ed9a396 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -43,13 +43,14 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
+
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
 import org.apache.activemq.artemis.api.core.management.ManagementHelper;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.io.IOCallback;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
+
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
 import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
 import org.apache.activemq.artemis.core.persistence.QueueStatus;
@@ -71,7 +72,6 @@ import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RoutingContext;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.server.ScheduledDeliveryHandler;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
 import org.apache.activemq.artemis.core.server.cluster.impl.Redistributor;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
@@ -440,12 +440,12 @@ public class QueueImpl implements Queue {
    }
 
    @Override
-   public void route(final ServerMessage message, final RoutingContext context) throws Exception {
+   public void route(final Message message, final RoutingContext context) throws Exception {
       context.addQueue(address, this);
    }
 
    @Override
-   public void routeWithAck(ServerMessage message, RoutingContext context) {
+   public void routeWithAck(Message message, RoutingContext context) {
       context.addQueueWithAck(address, this);
    }
 
@@ -922,7 +922,7 @@ public class QueueImpl implements Queue {
    }
 
    @Override
-   public boolean hasMatchingConsumer(final ServerMessage message) {
+   public boolean hasMatchingConsumer(final Message message) {
       for (ConsumerHolder holder : consumerList) {
          Consumer consumer = holder.consumer;
 
@@ -1055,7 +1055,7 @@ public class QueueImpl implements Queue {
          pageSubscription.ack((PagedReference) ref);
          postAcknowledge(ref);
       } else {
-         ServerMessage message = ref.getMessage();
+         Message message = ref.getMessage();
 
          boolean durableRef = message.isDurable() && durable;
 
@@ -1087,7 +1087,7 @@ public class QueueImpl implements Queue {
 
          getRefsOperation(tx).addAck(ref);
       } else {
-         ServerMessage message = ref.getMessage();
+         Message message = ref.getMessage();
 
          boolean durableRef = message.isDurable() && durable;
 
@@ -1111,7 +1111,7 @@ public class QueueImpl implements Queue {
 
    @Override
    public void reacknowledge(final Transaction tx, final MessageReference ref) throws Exception {
-      ServerMessage message = ref.getMessage();
+      Message message = ref.getMessage();
 
       if (message.isDurable() && durable) {
          tx.setContainsPersistent();
@@ -1216,11 +1216,11 @@ public class QueueImpl implements Queue {
       return expiryAddress;
    }
 
-   private SimpleString extractAddress(ServerMessage message) {
-      if (message.containsProperty(Message.HDR_ORIG_MESSAGE_ID)) {
-         return message.getSimpleStringProperty(Message.HDR_ORIGINAL_ADDRESS);
+   private SimpleString extractAddress(Message message) {
+      if (message.containsProperty(Message.HDR_ORIG_MESSAGE_ID.toString())) {
+         return message.getSimpleStringProperty(Message.HDR_ORIGINAL_ADDRESS.toString());
       } else {
-         return message.getAddress();
+         return message.getAddressSimpleString();
       }
    }
 
@@ -1244,7 +1244,9 @@ public class QueueImpl implements Queue {
       List<MessageReference> scheduledMessages = scheduledDeliveryHandler.cancel(null);
       if (scheduledMessages != null && scheduledMessages.size() > 0) {
          for (MessageReference ref : scheduledMessages) {
-            ref.getMessage().putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, ref.getScheduledDeliveryTime());
+
+            // TODO-now remove this, use something on Reference
+//            ref.getMessage().putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, ref.getScheduledDeliveryTime());
             ref.setScheduledDeliveryTime(0);
          }
          this.addHead(scheduledMessages, true);
@@ -2274,7 +2276,7 @@ public class QueueImpl implements Queue {
    public boolean checkRedelivery(final MessageReference reference,
                                   final long timeBase,
                                   final boolean ignoreRedeliveryDelay) throws Exception {
-      ServerMessage message = reference.getMessage();
+      Message message = reference.getMessage();
 
       if (internalQueue) {
          if (logger.isTraceEnabled()) {
@@ -2337,7 +2339,7 @@ public class QueueImpl implements Queue {
                      final boolean expiry,
                      final boolean rejectDuplicate,
                      final long... queueIDs) throws Exception {
-      ServerMessage copyMessage = makeCopy(ref, expiry);
+      Message copyMessage = makeCopy(ref, expiry);
 
       copyMessage.setAddress(toAddress);
 
@@ -2346,7 +2348,7 @@ public class QueueImpl implements Queue {
          for (long id : queueIDs) {
             buffer.putLong(id);
          }
-         copyMessage.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array());
+         copyMessage.putBytesProperty(Message.HDR_ROUTE_TO_IDS.toString(), buffer.array());
       }
 
       postOffice.route(copyMessage, tx, false, rejectDuplicate);
@@ -2358,16 +2360,17 @@ public class QueueImpl implements Queue {
    private void moveBetweenSnFQueues(final SimpleString queueSuffix,
                                      final Transaction tx,
                                      final MessageReference ref) throws Exception {
-      ServerMessage copyMessage = makeCopy(ref, false, false);
+      Message copyMessage = makeCopy(ref, false, false);
 
       byte[] oldRouteToIDs = null;
       String targetNodeID;
       Binding targetBinding;
 
+      // TODO-now: this needs to go away
       // remove the old route
       for (SimpleString propName : copyMessage.getPropertyNames()) {
-         if (propName.startsWith(MessageImpl.HDR_ROUTE_TO_IDS)) {
-            oldRouteToIDs = (byte[]) copyMessage.removeProperty(propName);
+         if (propName.startsWith(Message.HDR_ROUTE_TO_IDS)) {
+            oldRouteToIDs = (byte[]) copyMessage.removeProperty(propName.toString());
             final String hashcodeToString = oldRouteToIDs.toString(); // don't use Arrays.toString(..) here
             logger.debug("Removed property from message: " + propName + " = " + hashcodeToString + " (" + ByteBuffer.wrap(oldRouteToIDs).getLong() + ")");
 
@@ -2420,7 +2423,7 @@ public class QueueImpl implements Queue {
    }
 
    private Pair<String, Binding> locateTargetBinding(SimpleString queueSuffix,
-                                                     ServerMessage copyMessage,
+                                                     Message copyMessage,
                                                      long oldQueueID) {
       String targetNodeID = null;
       Binding targetBinding = null;
@@ -2440,7 +2443,7 @@ public class QueueImpl implements Queue {
                // parse the queue name of the remote queue binding to determine the node ID
                String temp = remoteQueueBinding.getQueue().getName().toString();
                targetNodeID = temp.substring(temp.lastIndexOf(".") + 1);
-               logger.debug("Message formerly destined for " + oldQueueName + " with ID: " + oldQueueID + " on address " + copyMessage.getAddress() + " on node " + targetNodeID);
+               logger.debug("Message formerly destined for " + oldQueueName + " with ID: " + oldQueueID + " on address " + copyMessage.getAddressSimpleString() + " on node " + targetNodeID);
 
                // now that we have the name of the queue we need to look through all the bindings again to find the new remote queue binding
                for (Map.Entry<SimpleString, Binding> entry2 : postOffice.getAllBindings().entrySet()) {
@@ -2468,14 +2471,14 @@ public class QueueImpl implements Queue {
       return new Pair<>(targetNodeID, targetBinding);
    }
 
-   private ServerMessage makeCopy(final MessageReference ref, final boolean expiry) throws Exception {
+   private Message makeCopy(final MessageReference ref, final boolean expiry) throws Exception {
       return makeCopy(ref, expiry, true);
    }
 
-   private ServerMessage makeCopy(final MessageReference ref,
+   private Message makeCopy(final MessageReference ref,
                                   final boolean expiry,
                                   final boolean copyOriginalHeaders) throws Exception {
-      ServerMessage message = ref.getMessage();
+      Message message = ref.getMessage();
       /*
        We copy the message and send that to the dla/expiry queue - this is
        because otherwise we may end up with a ref with the same message id in the
@@ -2487,7 +2490,15 @@ public class QueueImpl implements Queue {
 
       long newID = storageManager.generateID();
 
-      ServerMessage copy = message.makeCopyForExpiryOrDLA(newID, ref, expiry, copyOriginalHeaders);
+      Message copy = message.copy(newID);
+
+      if (copyOriginalHeaders) {
+         copy.referenceOriginalMessage(message, ref != null ? ref.getQueue().getAddress().toString() : null);
+      }
+
+      if (expiry) {
+         copy.putLongProperty(Message.HDR_ACTUAL_EXPIRY_TIME.toString(), System.currentTimeMillis());
+      }
 
       return copy;
    }
@@ -2549,7 +2560,7 @@ public class QueueImpl implements Queue {
          tx = new TransactionImpl(storageManager);
       }
 
-      ServerMessage copyMessage = makeCopy(ref, reason == AckReason.EXPIRED);
+      Message copyMessage = makeCopy(ref, reason == AckReason.EXPIRED);
 
       copyMessage.setAddress(address);
 
@@ -2719,7 +2730,7 @@ public class QueueImpl implements Queue {
          return;
       }
 
-      ServerMessage message;
+      Message message;
 
       try {
          message = ref.getMessage();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
index 8e3a94b..0f3da07 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
@@ -22,12 +22,12 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.paging.cursor.NonExistentPage;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
 import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
@@ -122,7 +122,7 @@ public class RefsOperation extends TransactionOperationAbstract {
          try {
             Transaction ackedTX = new TransactionImpl(storageManager);
             for (MessageReference ref : ackedRefs) {
-               ServerMessage message = ref.getMessage();
+               Message message = ref.getMessage();
                if (message.isDurable()) {
                   int durableRefCount = message.incrementDurableRefCount();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
index a5f96b1..4eda3ed 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
@@ -29,6 +29,7 @@ import java.util.Set;
 import java.util.TreeSet;
 
 import org.apache.activemq.artemis.api.core.Message;
+
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
@@ -39,7 +40,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.management.ManagementHelper;
 import org.apache.activemq.artemis.api.core.management.ResourceNames;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
+
 import org.apache.activemq.artemis.core.paging.PagingManager;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
@@ -54,7 +55,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.NodeManager;
 import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.cluster.ClusterControl;
 import org.apache.activemq.artemis.core.server.cluster.ClusterController;
 import org.apache.activemq.artemis.core.transaction.ResourceManager;
@@ -193,7 +193,7 @@ public class ScaleDownHandler {
                      buffer.putLong(queueID);
                   }
 
-                  message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array());
+                  message.putBytesProperty(Message.HDR_ROUTE_TO_IDS.toString(), buffer.array());
 
                   if (logger.isDebugEnabled()) {
                      if (messageReference.isPaged()) {
@@ -203,7 +203,8 @@ public class ScaleDownHandler {
                      }
                   }
 
-                  producer.send(address, message);
+                  // TODO-no: fix encoding
+                  producer.send(address, (Message)message);
                   messageCount++;
 
                   messagesIterator.remove();
@@ -264,11 +265,11 @@ public class ScaleDownHandler {
                byte[] oldRouteToIDs = null;
 
                List<SimpleString> propertiesToRemove = new ArrayList<>();
-               message.removeProperty(MessageImpl.HDR_ROUTE_TO_IDS);
+               message.removeProperty(Message.HDR_ROUTE_TO_IDS.toString());
                for (SimpleString propName : message.getPropertyNames()) {
-                  if (propName.startsWith(MessageImpl.HDR_ROUTE_TO_IDS)) {
+                  if (propName.startsWith(Message.HDR_ROUTE_TO_IDS)) {
                      if (propName.toString().endsWith(propertyEnd)) {
-                        oldRouteToIDs = message.getBytesProperty(propName);
+                        oldRouteToIDs = message.getBytesProperty(propName.toString());
                      }
                      propertiesToRemove.add(propName);
                   }
@@ -277,17 +278,19 @@ public class ScaleDownHandler {
                // TODO: what if oldRouteToIDs == null ??
 
                for (SimpleString propertyToRemove : propertiesToRemove) {
-                  message.removeProperty(propertyToRemove);
+                  message.removeProperty(propertyToRemove.toString());
                }
 
                if (queueOnTarget) {
-                  message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, oldRouteToIDs);
+                  message.putBytesProperty(Message.HDR_ROUTE_TO_IDS.toString(), oldRouteToIDs);
                } else {
-                  message.putBytesProperty(MessageImpl.HDR_SCALEDOWN_TO_IDS, oldRouteToIDs);
+                  message.putBytesProperty(Message.HDR_SCALEDOWN_TO_IDS.toString(), oldRouteToIDs);
                }
 
                logger.debug("Scaling down message " + message + " from " + address + " to " + message.getAddress() + " on node " + targetNodeId);
-               producer.send(message.getAddress(), message);
+
+               // TODO-now: fix encode
+               producer.send(message.getAddress(), (Message)message);
 
                messageCount++;
 
@@ -322,13 +325,13 @@ public class ScaleDownHandler {
          List<TransactionOperation> allOperations = transaction.getAllOperations();
 
          // Get the information of the Prepared TXs so it could replay the TXs
-         Map<ServerMessage, Pair<List<Long>, List<Long>>> queuesToSendTo = new HashMap<>();
+         Map<Message, Pair<List<Long>, List<Long>>> queuesToSendTo = new HashMap<>();
          for (TransactionOperation operation : allOperations) {
             if (operation instanceof PostOfficeImpl.AddOperation) {
                PostOfficeImpl.AddOperation addOperation = (PostOfficeImpl.AddOperation) operation;
                List<MessageReference> refs = addOperation.getRelatedMessageReferences();
                for (MessageReference ref : refs) {
-                  ServerMessage message = ref.getMessage();
+                  Message message = ref.getMessage();
                   Queue queue = ref.getQueue();
                   long queueID;
                   String queueName = queue.getName().toString();
@@ -336,7 +339,7 @@ public class ScaleDownHandler {
                   if (queueIDs.containsKey(queueName)) {
                      queueID = queueIDs.get(queueName);
                   } else {
-                     queueID = createQueueIfNecessaryAndGetID(queueCreateSession, queue, message.getAddress());
+                     queueID = createQueueIfNecessaryAndGetID(queueCreateSession, queue, message.getAddressSimpleString());
                      queueIDs.put(queueName, queueID);  // store it so we don't have to look it up every time
                   }
                   Pair<List<Long>, List<Long>> queueIds = queuesToSendTo.get(message);
@@ -350,7 +353,7 @@ public class ScaleDownHandler {
                RefsOperation refsOperation = (RefsOperation) operation;
                List<MessageReference> refs = refsOperation.getReferencesToAcknowledge();
                for (MessageReference ref : refs) {
-                  ServerMessage message = ref.getMessage();
+                  Message message = ref.getMessage();
                   Queue queue = ref.getQueue();
                   long queueID;
                   String queueName = queue.getName().toString();
@@ -358,7 +361,7 @@ public class ScaleDownHandler {
                   if (queueIDs.containsKey(queueName)) {
                      queueID = queueIDs.get(queueName);
                   } else {
-                     queueID = createQueueIfNecessaryAndGetID(queueCreateSession, queue, message.getAddress());
+                     queueID = createQueueIfNecessaryAndGetID(queueCreateSession, queue, message.getAddressSimpleString());
                      queueIDs.put(queueName, queueID);  // store it so we don't have to look it up every time
                   }
                   Pair<List<Long>, List<Long>> queueIds = queuesToSendTo.get(message);
@@ -373,23 +376,23 @@ public class ScaleDownHandler {
          }
 
          ClientProducer producer = session.createProducer();
-         for (Map.Entry<ServerMessage, Pair<List<Long>, List<Long>>> entry : queuesToSendTo.entrySet()) {
+         for (Map.Entry<Message, Pair<List<Long>, List<Long>>> entry : queuesToSendTo.entrySet()) {
             List<Long> ids = entry.getValue().getA();
             ByteBuffer buffer = ByteBuffer.allocate(ids.size() * 8);
             for (Long id : ids) {
                buffer.putLong(id);
             }
-            ServerMessage message = entry.getKey();
-            message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array());
+            Message message = entry.getKey();
+            message.putBytesProperty(Message.HDR_ROUTE_TO_IDS.toString(), buffer.array());
             ids = entry.getValue().getB();
             if (ids.size() > 0) {
                buffer = ByteBuffer.allocate(ids.size() * 8);
                for (Long id : ids) {
                   buffer.putLong(id);
                }
-               message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_ACK_IDS, buffer.array());
+               message.putBytesProperty(Message.HDR_ROUTE_TO_ACK_IDS.toString(), buffer.array());
             }
-            producer.send(message.getAddress(), message);
+            producer.send(message.getAddressSimpleString().toString(), message);
          }
          session.end(xid, XAResource.TMSUCCESS);
          session.prepare(xid);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index bcc6df1..a130437 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -31,12 +31,14 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
 import org.apache.activemq.artemis.api.core.management.ManagementHelper;
 import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
 import org.apache.activemq.artemis.core.filter.Filter;
-import org.apache.activemq.artemis.core.message.BodyEncoder;
+import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.QueueBinding;
@@ -48,7 +50,6 @@ import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
@@ -205,7 +206,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
 
       this.creationTime = System.currentTimeMillis();
 
-
       if (browseOnly) {
          browserDeliverer = new BrowserDeliverer(messageQueue.browserIterator());
       } else {
@@ -341,7 +341,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
             }
             return HandleStatus.BUSY;
          }
-         final ServerMessage message = ref.getMessage();
+         final Message message = ref.getMessage();
 
          if (filter != null && !filter.match(message)) {
             if (logger.isTraceEnabled()) {
@@ -400,7 +400,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
    @Override
    public void proceedDeliver(MessageReference reference) throws Exception {
       try {
-         ServerMessage message = reference.getMessage();
+         Message message = reference.getMessage();
 
          if (message.isLargeMessage() && supportLargeMessage) {
             if (largeMessageDeliverer == null) {
@@ -508,16 +508,13 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
     */
    @Override
    public void forceDelivery(final long sequence) {
-      forceDelivery(sequence, new Runnable() {
-         @Override
-         public void run() {
-            ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateID(), 50);
+      forceDelivery(sequence, () -> {
+         Message forcedDeliveryMessage = new CoreMessage(storageManager.generateID(), 50);
 
-            forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
-            forcedDeliveryMessage.setAddress(messageQueue.getName());
+         forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
+         forcedDeliveryMessage.setAddress(messageQueue.getName());
 
-            callback.sendMessage(null, forcedDeliveryMessage, ServerConsumerImpl.this, 0);
-         }
+         callback.sendMessage(null, forcedDeliveryMessage, ServerConsumerImpl.this, 0);
       });
    }
 
@@ -1018,7 +1015,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
     * @param ref
     * @param message
     */
-   private void deliverStandardMessage(final MessageReference ref, final ServerMessage message) {
+   private void deliverStandardMessage(final MessageReference ref, final Message message) {
       int packetSize = callback.sendMessage(ref, message, ServerConsumerImpl.this, ref.getDeliveryCount());
 
       if (availableCredits != null) {
@@ -1070,7 +1067,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
        */
       private long positionPendingLargeMessage;
 
-      private BodyEncoder context;
+      private LargeBodyEncoder context;
 
       private LargeMessageDeliverer(final LargeServerMessage message, final MessageReference ref) throws Exception {
          largeMessage = message;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerMessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerMessageImpl.java
deleted file mode 100644
index 39e77ca..0000000
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerMessageImpl.java
+++ /dev/null
@@ -1,341 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.server.impl;
-
-import java.io.InputStream;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.activemq.artemis.api.core.Message;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
-import org.apache.activemq.artemis.core.paging.PagingStore;
-import org.apache.activemq.artemis.core.server.MessageReference;
-import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.utils.DataConstants;
-import org.apache.activemq.artemis.utils.MemorySize;
-import org.apache.activemq.artemis.utils.TypedProperties;
-
-public class ServerMessageImpl extends MessageImpl implements ServerMessage {
-
-   private final AtomicInteger durableRefCount = new AtomicInteger();
-
-   private final AtomicInteger refCount = new AtomicInteger();
-
-   private PagingStore pagingStore;
-
-   private static final int memoryOffset;
-
-   private boolean persisted = false;
-
-   static {
-      // This is an estimate of how much memory a ServerMessageImpl takes up, exclusing body and properties
-      // Note, it is only an estimate, it's not possible to be entirely sure with Java
-      // This figure is calculated using the test utilities in org.apache.activemq.tests.unit.util.sizeof
-      // The value is somewhat higher on 64 bit architectures, probably due to different alignment
-
-      if (MemorySize.is64bitArch()) {
-         memoryOffset = 352;
-      } else {
-         memoryOffset = 232;
-      }
-   }
-
-   /*
-    * Constructor for when reading from network
-    */
-   public ServerMessageImpl() {
-   }
-
-   /*
-    * Construct a MessageImpl from storage, or notification, or before routing
-    */
-   public ServerMessageImpl(final long messageID, final int initialMessageBufferSize) {
-      super(initialMessageBufferSize);
-
-      this.messageID = messageID;
-   }
-
-   /*
-    * Copy constructor
-    */
-   protected ServerMessageImpl(final ServerMessageImpl other) {
-      super(other);
-   }
-
-   /*
-    * Copy constructor
-    */
-   protected ServerMessageImpl(final ServerMessageImpl other, TypedProperties properties) {
-      super(other, properties);
-   }
-
-   @Override
-   public boolean isServerMessage() {
-      return true;
-   }
-
-   @Override
-   public ServerMessageImpl setMessageID(final long id) {
-      messageID = id;
-      return this;
-   }
-
-   @Override
-   public MessageReference createReference(final Queue queue) {
-      MessageReference ref = new MessageReferenceImpl(this, queue);
-
-      return ref;
-   }
-
-   @Override
-   public boolean hasInternalProperties() {
-      return properties.hasInternalProperties();
-   }
-
-   @Override
-   public int incrementRefCount() throws Exception {
-      int count = refCount.incrementAndGet();
-
-      if (pagingStore != null) {
-         if (count == 1) {
-            pagingStore.addSize(getMemoryEstimate() + MessageReferenceImpl.getMemoryEstimate());
-         } else {
-            pagingStore.addSize(MessageReferenceImpl.getMemoryEstimate());
-         }
-      }
-
-      return count;
-   }
-
-   @Override
-   public int decrementRefCount() throws Exception {
-      int count = refCount.decrementAndGet();
-
-      if (count < 0) {
-         // this could happen on paged messages since they are not routed and incrementRefCount is never called
-         return count;
-      }
-
-      if (pagingStore != null) {
-         if (count == 0) {
-            pagingStore.addSize(-getMemoryEstimate() - MessageReferenceImpl.getMemoryEstimate());
-
-            if (buffer != null) {
-               // release the buffer now
-               buffer.byteBuf().release();
-            }
-         } else {
-            pagingStore.addSize(-MessageReferenceImpl.getMemoryEstimate());
-         }
-      }
-
-      return count;
-   }
-
-   @Override
-   public int incrementDurableRefCount() {
-      return durableRefCount.incrementAndGet();
-   }
-
-   @Override
-   public int decrementDurableRefCount() {
-      return durableRefCount.decrementAndGet();
-   }
-
-   @Override
-   public int getRefCount() {
-      return refCount.get();
-   }
-
-   @Override
-   public boolean isLargeMessage() {
-      return false;
-   }
-
-   private volatile int memoryEstimate = -1;
-
-   @Override
-   public int getMemoryEstimate() {
-      if (memoryEstimate == -1) {
-         memoryEstimate = ServerMessageImpl.memoryOffset + buffer.capacity() + properties.getMemoryOffset();
-      }
-
-      return memoryEstimate;
-   }
-
-   @Override
-   public ServerMessage copy(final long newID) {
-      ServerMessage m = new ServerMessageImpl(this);
-
-      m.setMessageID(newID);
-
-      return m;
-   }
-
-   @Override
-   public ServerMessage copy() {
-      // This is a simple copy, used only to avoid changing original properties
-      return new ServerMessageImpl(this);
-   }
-
-   public ServerMessage makeCopyForExpiryOrDLA(final long newID,
-                                               MessageReference originalReference,
-                                               final boolean expiry) throws Exception {
-      return makeCopyForExpiryOrDLA(newID, originalReference, expiry, true);
-   }
-
-   @Override
-   public ServerMessage makeCopyForExpiryOrDLA(final long newID,
-                                               MessageReference originalReference,
-                                               final boolean expiry,
-                                               final boolean copyOriginalHeaders) throws Exception {
-      /*
-       We copy the message and send that to the dla/expiry queue - this is
-       because otherwise we may end up with a ref with the same message id in the
-       queue more than once which would barf - this might happen if the same message had been
-       expire from multiple subscriptions of a topic for example
-       We set headers that hold the original message address, expiry time
-       and original message id
-      */
-
-      ServerMessage copy = copy(newID);
-
-      if (copyOriginalHeaders) {
-         copy.setOriginalHeaders(this, originalReference, expiry);
-      }
-
-      return copy;
-   }
-
-   @Override
-   public void setOriginalHeaders(final ServerMessage other,
-                                  final MessageReference originalReference,
-                                  final boolean expiry) {
-      SimpleString originalQueue = other.getSimpleStringProperty(Message.HDR_ORIGINAL_QUEUE);
-
-      if (originalQueue != null) {
-         putStringProperty(Message.HDR_ORIGINAL_QUEUE, originalQueue);
-      } else if (originalReference != null) {
-         putStringProperty(Message.HDR_ORIGINAL_QUEUE, originalReference.getQueue().getName());
-      }
-
-      if (other.containsProperty(Message.HDR_ORIG_MESSAGE_ID)) {
-         putStringProperty(Message.HDR_ORIGINAL_ADDRESS, other.getSimpleStringProperty(Message.HDR_ORIGINAL_ADDRESS));
-
-         putLongProperty(Message.HDR_ORIG_MESSAGE_ID, other.getLongProperty(Message.HDR_ORIG_MESSAGE_ID));
-      } else {
-         putStringProperty(Message.HDR_ORIGINAL_ADDRESS, other.getAddress());
-
-         putLongProperty(Message.HDR_ORIG_MESSAGE_ID, other.getMessageID());
-      }
-
-      // reset expiry
-      setExpiration(0);
-
-      if (expiry) {
-         long actualExpiryTime = System.currentTimeMillis();
-
-         putLongProperty(Message.HDR_ACTUAL_EXPIRY_TIME, actualExpiryTime);
-      }
-
-      bufferValid = false;
-   }
-
-   @Override
-   public void setPagingStore(final PagingStore pagingStore) {
-      this.pagingStore = pagingStore;
-
-      // On the server side, we reset the address to point to the instance of address in the paging store
-      // Otherwise each message would have its own copy of the address String which would take up more memory
-      address = pagingStore.getAddress();
-   }
-
-   @Override
-   public synchronized void forceAddress(final SimpleString address) {
-      this.address = address;
-      bufferValid = false;
-   }
-
-   @Override
-   public PagingStore getPagingStore() {
-      return pagingStore;
-   }
-
-   @Override
-   public boolean storeIsPaging() {
-      if (pagingStore != null) {
-         return pagingStore.isPaging();
-      } else {
-         return false;
-      }
-   }
-
-   @Override
-   public String toString() {
-      try {
-         return "ServerMessage[messageID=" + messageID + ",durable=" + isDurable() + ",userID=" + getUserID() + ",priority=" + this.getPriority() + ", bodySize=" + this.getBodyBufferDuplicate().capacity() +
-            ", timestamp=" + toDate(getTimestamp()) + ",expiration=" + toDate(getExpiration()) +
-            ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]@" + System.identityHashCode(this);
-      } catch (Throwable e) {
-         return "ServerMessage[messageID=" + messageID + "]";
-      }
-   }
-
-   private static String toDate(long timestamp) {
-      if (timestamp == 0) {
-         return "0";
-      } else {
-         return new java.util.Date(timestamp).toString();
-      }
-
-   }
-
-   @Override
-   public InputStream getBodyInputStream() {
-      return null;
-   }
-
-   // Encoding stuff
-
-   @Override
-   public void encodeMessageIDToBuffer() {
-      // We first set the message id - this needs to be set on the buffer since this buffer will be re-used
-
-      buffer.setLong(buffer.getInt(MessageImpl.BUFFER_HEADER_SPACE) + DataConstants.SIZE_INT, messageID);
-   }
-
-   @Override
-   public byte[] getDuplicateIDBytes() {
-      Object duplicateID = getDuplicateProperty();
-
-      if (duplicateID == null) {
-         return null;
-      } else {
-         if (duplicateID instanceof SimpleString) {
-            return ((SimpleString) duplicateID).getData();
-         } else {
-            return (byte[]) duplicateID;
-         }
-      }
-   }
-
-   @Override
-   public Object getDuplicateProperty() {
-      return getObjectProperty(Message.HDR_DUPLICATE_DETECTION_ID);
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 52ecda1..27fbdcb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -31,6 +31,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.activemq.artemis.Closeable;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
 import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
@@ -46,7 +47,6 @@ import org.apache.activemq.artemis.core.exception.ActiveMQXAException;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
 import org.apache.activemq.artemis.core.io.IOCallback;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
 import org.apache.activemq.artemis.core.paging.PagingManager;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
@@ -66,14 +66,13 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.AddressQueryResult;
 import org.apache.activemq.artemis.core.server.BindingQueryResult;
-import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
 import org.apache.activemq.artemis.core.server.RoutingContext;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
+
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.TempQueueObserver;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
@@ -155,9 +154,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
    private final SimpleString managementAddress;
 
-   // The current currentLargeMessage being processed
-   private volatile LargeServerMessage currentLargeMessage;
-
    protected final RoutingContext routingContext = new RoutingContextImpl(null);
 
    protected final SessionCallback callback;
@@ -187,6 +183,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
    private Map<SimpleString, RoutingType> prefixes;
 
+   private Set<Closeable> closeables;
+
    public ServerSessionImpl(final String name,
                             final String username,
                             final String password,
@@ -273,6 +271,14 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
    }
 
    @Override
+   public void addCloseable(Closeable closeable) {
+      if (closeables == null) {
+         closeables = new HashSet<>();
+      }
+      this.closeables.add(closeable);
+   }
+
+   @Override
    public void disableSecurity() {
       this.securityEnabled = false;
    }
@@ -376,11 +382,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
       consumers.clear();
 
-      if (currentLargeMessage != null) {
-         try {
-            currentLargeMessage.deleteFile();
-         } catch (Throwable error) {
-            ActiveMQServerLogger.LOGGER.errorDeletingLargeMessageFile(error);
+      if (closeables != null) {
+         for (Closeable closeable : closeables) {
+            closeable.close(failed);
          }
       }
 
@@ -1272,30 +1276,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
    }
 
    @Override
-   public void sendLarge(final MessageInternal message) throws Exception {
-      // need to create the LargeMessage before continue
-      long id = storageManager.generateID();
-
-      LargeServerMessage largeMsg = storageManager.createLargeMessage(id, message);
-
-      if (logger.isTraceEnabled()) {
-         logger.trace("sendLarge::" + largeMsg);
-      }
-
-      if (currentLargeMessage != null) {
-         ActiveMQServerLogger.LOGGER.replacingIncompleteLargeMessage(currentLargeMessage.getMessageID());
-      }
-
-      currentLargeMessage = largeMsg;
-   }
-
-   @Override
-   public RoutingStatus send(final ServerMessage message, final boolean direct) throws Exception {
+   public RoutingStatus send(final Message message, final boolean direct) throws Exception {
       return send(message, direct, false);
    }
 
    @Override
-   public RoutingStatus send(final ServerMessage message,
+   public RoutingStatus send(final Message message,
                              final boolean direct,
                              boolean noAutoCreateQueue) throws Exception {
       return send(getCurrentTransaction(), message, direct, noAutoCreateQueue);
@@ -1303,7 +1289,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
    @Override
    public RoutingStatus send(Transaction tx,
-                             final ServerMessage message,
+                             final Message message,
                              final boolean direct,
                              boolean noAutoCreateQueue) throws Exception {
 
@@ -1319,19 +1305,18 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
       //case the id header already generated.
       if (!message.isLargeMessage()) {
          long id = storageManager.generateID();
-
+         // This will re-encode the message
          message.setMessageID(id);
-         message.encodeMessageIDToBuffer();
       }
 
       if (server.getConfiguration().isPopulateValidatedUser() && validatedUser != null) {
          message.putStringProperty(Message.HDR_VALIDATED_USER, SimpleString.toSimpleString(validatedUser));
       }
 
-      SimpleString address = removePrefix(message.getAddress());
+      SimpleString address = removePrefix(message.getAddressSimpleString());
 
       // In case the prefix was removed, we also need to update the message
-      if (address != message.getAddress()) {
+      if (address != message.getAddressSimpleString()) {
          message.setAddress(address);
       }
 
@@ -1340,14 +1325,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
       }
 
       if (address == null) {
-         if (message.isDurable()) {
-            // We need to force a re-encode when the message gets persisted or when it gets reloaded
-            // it will have no address
-            message.setAddress(defaultAddress);
-         } else {
-            // We don't want to force a re-encode when the message gets sent to the consumer
-            message.setAddressTransient(defaultAddress);
-         }
+         // We don't want to force a re-encode when the message gets sent to the consumer
+         message.setAddress(defaultAddress);
       }
 
       if (logger.isTraceEnabled()) {
@@ -1359,7 +1338,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
          throw ActiveMQMessageBundle.BUNDLE.noAddress();
       }
 
-      if (message.getAddress().equals(managementAddress)) {
+      if (message.getAddressSimpleString().equals(managementAddress)) {
          // It's a management message
 
          handleManagementMessage(tx, message, direct);
@@ -1369,32 +1348,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
       return result;
    }
 
-   @Override
-   public void sendContinuations(final int packetSize,
-                                 final long messageBodySize,
-                                 final byte[] body,
-                                 final boolean continues) throws Exception {
-      if (currentLargeMessage == null) {
-         throw ActiveMQMessageBundle.BUNDLE.largeMessageNotInitialised();
-      }
-
-      // Immediately release the credits for the continuations- these don't contribute to the in-memory size
-      // of the message
-
-      currentLargeMessage.addBytes(body);
-
-      if (!continues) {
-         currentLargeMessage.releaseResources();
-
-         if (messageBodySize >= 0) {
-            currentLargeMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize);
-         }
-
-         doSend(tx, currentLargeMessage, false, false);
-
-         currentLargeMessage = null;
-      }
-   }
 
    @Override
    public void requestProducerCredits(SimpleString address, final int credits) throws Exception {
@@ -1566,10 +1519,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
       connectionFailed(me, failedOver);
    }
 
-   public void clearLargeMessage() {
-      currentLargeMessage = null;
-   }
-
    private void installJMSHooks() {
    }
 
@@ -1588,10 +1537,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
    }
 
    private RoutingStatus handleManagementMessage(final Transaction tx,
-                                                 final ServerMessage message,
+                                                 final Message message,
                                                  final boolean direct) throws Exception {
       try {
-         securityCheck(removePrefix(message.getAddress()), CheckType.MANAGE, this);
+         securityCheck(removePrefix(message.getAddressSimpleString()), CheckType.MANAGE, this);
       } catch (ActiveMQException e) {
          if (!autoCommitSends) {
             tx.markAsRollbackOnly(e);
@@ -1599,7 +1548,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
          throw e;
       }
 
-      ServerMessage reply = managementService.handleMessage(message);
+      Message reply = managementService.handleMessage(message);
 
       SimpleString replyTo = message.getSimpleStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME);
 
@@ -1669,8 +1618,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
       theTx.rollback();
    }
 
+   @Override
    public RoutingStatus doSend(final Transaction tx,
-                               final ServerMessage msg,
+                               final Message msg,
                                final boolean direct,
                                final boolean noAutoCreateQueue) throws Exception {
       RoutingStatus result = RoutingStatus.OK;
@@ -1683,7 +1633,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
       if (msg.containsProperty(Message.HDR_ROUTING_TYPE)) {
          routingType = RoutingType.getType(msg.getByteProperty(Message.HDR_ROUTING_TYPE));
       }
-      Pair<SimpleString, RoutingType> art = getAddressAndRoutingType(msg.getAddress(), routingType);
+      Pair<SimpleString, RoutingType> art = getAddressAndRoutingType(msg.getAddressSimpleString(), routingType);
 
       // Consumer
       // check the user has write access to this address.
@@ -1707,12 +1657,14 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
          result = postOffice.route(msg, routingContext, direct);
 
-         Pair<UUID, AtomicLong> value = targetAddressInfos.get(msg.getAddress());
+         Pair<UUID, AtomicLong> value = targetAddressInfos.get(msg.getAddressSimpleString());
 
          if (value == null) {
-            targetAddressInfos.put(msg.getAddress(), new Pair<>(msg.getUserID(), new AtomicLong(1)));
+            // TODO-now: userID
+            targetAddressInfos.put(msg.getAddressSimpleString(), new Pair<>((UUID)msg.getUserID(), new AtomicLong(1)));
          } else {
-            value.setA(msg.getUserID());
+            // TODO-now: userID
+            value.setA((UUID)msg.getUserID());
             value.getB().incrementAndGet();
          }
       } finally {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java
index 0222928..84ab636 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java
@@ -21,6 +21,7 @@ import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
@@ -42,7 +43,7 @@ import org.apache.activemq.artemis.core.server.Divert;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueFactory;
 import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.core.server.ServerMessage;
+
 import org.apache.activemq.artemis.core.server.cluster.Bridge;
 import org.apache.activemq.artemis.core.server.cluster.BroadcastGroup;
 import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
@@ -128,5 +129,5 @@ public interface ManagementService extends NotificationService, ActiveMQComponen
 
    Object[] getResources(Class<?> resourceType);
 
-   ServerMessage handleMessage(ServerMessage message) throws Exception;
+   Message handleMessage(Message message) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
index 55f2aea..002b2c7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
@@ -34,6 +34,8 @@ import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
 import org.apache.activemq.artemis.api.core.JsonUtil;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.management.AcceptorControl;
@@ -56,6 +58,7 @@ import org.apache.activemq.artemis.core.management.impl.BroadcastGroupControlImp
 import org.apache.activemq.artemis.core.management.impl.ClusterConnectionControlImpl;
 import org.apache.activemq.artemis.core.management.impl.DivertControlImpl;
 import org.apache.activemq.artemis.core.management.impl.QueueControlImpl;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.messagecounter.MessageCounter;
 import org.apache.activemq.artemis.core.messagecounter.MessageCounterManager;
 import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl;
@@ -71,13 +74,10 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.Divert;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueFactory;
-import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.cluster.Bridge;
 import org.apache.activemq.artemis.core.server.cluster.BroadcastGroup;
 import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
 import org.apache.activemq.artemis.core.server.management.Notification;
 import org.apache.activemq.artemis.core.server.management.NotificationListener;
@@ -365,9 +365,9 @@ public class ManagementServiceImpl implements ManagementService {
    }
 
    @Override
-   public ServerMessage handleMessage(final ServerMessage message) throws Exception {
+   public Message handleMessage(final Message message) throws Exception {
       // a reply message is sent with the result stored in the message body.
-      ServerMessage reply = new ServerMessageImpl(storageManager.generateID(), 512);
+      Message reply = new CoreMessage(storageManager.generateID(), 512);
 
       String resourceName = message.getStringProperty(ManagementHelper.HDR_RESOURCE_NAME);
       if (logger.isDebugEnabled()) {
@@ -631,7 +631,7 @@ public class ManagementServiceImpl implements ManagementService {
 
                long messageID = storageManager.generateID();
 
-               ServerMessage notificationMessage = new ServerMessageImpl(messageID, 512);
+               Message notificationMessage = new CoreMessage(messageID, 512);
 
                // Notification messages are always durable so the user can choose whether to add a durable queue to
                // consume them in

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionDetail.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionDetail.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionDetail.java
index efe4cf9..e686a2c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionDetail.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionDetail.java
@@ -26,8 +26,9 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.activemq.artemis.api.core.JsonUtil;
+
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.server.MessageReference;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
 import org.apache.activemq.artemis.utils.JsonLoader;
 
@@ -97,7 +98,7 @@ public abstract class TransactionDetail {
 
             msgJson.add(KEY_MSG_OP_TYPE, opType);
 
-            ServerMessage msg = ref.getMessage().copy();
+            Message msg = ref.getMessage().copy();
 
             msgJson.add(KEY_MSG_TYPE, decodeMessageType(msg));
             JsonUtil.addToObject(KEY_MSG_PROPERTIES, decodeMessageProperties(msg), msgJson);
@@ -108,7 +109,7 @@ public abstract class TransactionDetail {
       return detailJson.build();
    }
 
-   public abstract String decodeMessageType(ServerMessage msg);
+   public abstract String decodeMessageType(Message msg);
 
-   public abstract Map<String, Object> decodeMessageProperties(ServerMessage msg);
+   public abstract Map<String, Object> decodeMessageProperties(Message msg);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/CoreTransactionDetail.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/CoreTransactionDetail.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/CoreTransactionDetail.java
index 4730596..c885341 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/CoreTransactionDetail.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/CoreTransactionDetail.java
@@ -20,7 +20,7 @@ import javax.transaction.xa.Xid;
 import java.util.Map;
 
 import org.apache.activemq.artemis.api.core.Message;
-import org.apache.activemq.artemis.core.server.ServerMessage;
+
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.core.transaction.TransactionDetail;
 
@@ -31,7 +31,7 @@ public class CoreTransactionDetail extends TransactionDetail {
    }
 
    @Override
-   public String decodeMessageType(ServerMessage msg) {
+   public String decodeMessageType(Message msg) {
       int type = msg.getType();
       switch (type) {
          case Message.DEFAULT_TYPE: // 0
@@ -52,7 +52,7 @@ public class CoreTransactionDetail extends TransactionDetail {
    }
 
    @Override
-   public Map<String, Object> decodeMessageProperties(ServerMessage msg) {
+   public Map<String, Object> decodeMessageProperties(Message msg) {
       return msg.toMap();
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java
index a342e13..3a5e2bf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java
@@ -16,12 +16,11 @@
  */
 package org.apache.activemq.artemis.spi.core.protocol;
 
-import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.api.core.Message;
 
-// TODO: use this interface properly on OpenWire
 public interface MessageConverter {
 
-   ServerMessage inbound(Object messageInbound) throws Exception;
+   Message inbound(Object messageInbound) throws Exception;
 
-   Object outbound(ServerMessage messageOutbound, int deliveryCount) throws Exception;
+   Object outbound(Message messageOutbound, int deliveryCount) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java
new file mode 100644
index 0000000..14891f5
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.spi.core.protocol;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.jboss.logging.Logger;
+
+public class MessagePersister implements Persister<Message> {
+
+   private static final Logger logger = Logger.getLogger(MessagePersister.class);
+
+   private static final MessagePersister theInstance = new MessagePersister();
+
+   /** This will be used for reading messages */
+   private static Map<Byte, Persister<Message>> protocols = new ConcurrentHashMap<>();
+
+
+   public static void registerProtocol(ProtocolManagerFactory manager) {
+      Persister<Message> messagePersister = manager.getPersister();
+      if (messagePersister == null) {
+         logger.warn("Cannot find persister for " + manager);
+      } else {
+         registerPersister(manager.getStoreID(), manager.getPersister());
+      }
+   }
+
+   public static void clearPersisters() {
+      protocols.clear();
+   }
+
+   public static void registerPersister(byte recordType, Persister<Message> persister) {
+      protocols.put(recordType, persister);
+   }
+
+   public static MessagePersister getInstance() {
+      return theInstance;
+   }
+
+
+   protected MessagePersister() {
+   }
+
+   protected byte getID() {
+      return (byte)0;
+   }
+
+   @Override
+   public int getEncodeSize(Message record) {
+      return 0;
+   }
+
+
+   /** Sub classes must add the first short as the protocol-id */
+   @Override
+   public void encode(ActiveMQBuffer buffer, Message record) {
+      buffer.writeByte(getID());
+   }
+
+   @Override
+   public Message decode(ActiveMQBuffer buffer, Message record) {
+      byte protocol = buffer.readByte();
+      Persister<Message> persister = protocols.get(protocol);
+      if (persister == null) {
+         throw new NullPointerException("couldn't find factory for type=" + protocol);
+      }
+      return persister.decode(buffer, record);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java
index 890fbfe..c2b7334 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java
@@ -28,6 +28,8 @@ import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 
+/**
+ * Info: ProtocolManager is loaded by {@link org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl#loadProtocolManagerFactories(Iterable)} */
 public interface ProtocolManager<P extends BaseInterceptor> {
 
    ProtocolManagerFactory<P> getFactory();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java
index d3b1b2e..9574540 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java
@@ -20,10 +20,25 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.activemq.artemis.api.core.BaseInterceptor;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 
 public interface ProtocolManagerFactory<P extends BaseInterceptor> {
 
+   /** This is to be used to store the protocol-id on Messages.
+    *  Messages are stored on their bare format.
+    *  The protocol manager will be responsible to code or decode messages.
+    *  The caveat here is that the first short-sized bytes need to be this constant. */
+   default byte getStoreID() {
+      return (byte)0;
+   }
+
+   default Persister<Message> getPersister() {
+      return null;
+   }
+
+
    /**
     * When you create the ProtocolManager, you should filter out any interceptors that won't belong
     * to this Protocol.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
index ee236c7..799e8b0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
@@ -16,10 +16,10 @@
  */
 package org.apache.activemq.artemis.spi.core.protocol;
 
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 
 public interface SessionCallback {
@@ -55,10 +55,10 @@ public interface SessionCallback {
    //       and I wanted to avoid re-fetching paged data in case of GCs on this specific case.
    //
    //       Future developments may change this, but beware why I have chosen to keep the parameter separated here
-   int sendMessage(MessageReference ref, ServerMessage message, ServerConsumer consumerID, int deliveryCount);
+   int sendMessage(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount);
 
    int sendLargeMessage(MessageReference reference,
-                        ServerMessage message,
+                        Message message,
                         ServerConsumer consumerID,
                         long bodySize,
                         int deliveryCount);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/resources/schema/artemis-configuration.xsd
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 0c33a35..6fdef44 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -682,22 +682,6 @@
             </xsd:annotation>
          </xsd:element>
 
-         <xsd:element name="perf-blast-pages" type="xsd:int" default="-1" maxOccurs="1" minOccurs="0">
-            <xsd:annotation>
-               <xsd:documentation>
-                  XXX Only meant to be used by project developers
-               </xsd:documentation>
-            </xsd:annotation>
-         </xsd:element>
-
-         <xsd:element name="run-sync-speed-test" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
-            <xsd:annotation>
-               <xsd:documentation>
-                  XXX Only meant to be used by project developers
-               </xsd:documentation>
-            </xsd:annotation>
-         </xsd:element>
-
          <xsd:element name="server-dump-interval" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0">
             <xsd:annotation>
                <xsd:documentation>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
index f374979..5e9a95a 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
@@ -77,7 +77,6 @@ public class ConfigurationImplTest extends ActiveMQTestBase {
       Assert.assertEquals(ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, conf.getJournalBufferSize_AIO());
       Assert.assertEquals(ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, conf.getJournalBufferSize_NIO());
       Assert.assertEquals(ActiveMQDefaultConfiguration.isDefaultJournalLogWriteRate(), conf.isLogJournalWriteRate());
-      Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultJournalPerfBlastPages(), conf.getJournalPerfBlastPages());
       Assert.assertEquals(ActiveMQDefaultConfiguration.isDefaultMessageCounterEnabled(), conf.isMessageCounterEnabled());
       Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultMessageCounterMaxDayHistory(), conf.getMessageCounterMaxDayHistory());
       Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultMessageCounterSamplePeriod(), conf.getMessageCounterSamplePeriod());
@@ -232,10 +231,6 @@ public class ConfigurationImplTest extends ActiveMQTestBase {
          conf.setLogJournalWriteRate(b);
          Assert.assertEquals(b, conf.isLogJournalWriteRate());
 
-         i = RandomUtil.randomInt();
-         conf.setJournalPerfBlastPages(i);
-         Assert.assertEquals(i, conf.getJournalPerfBlastPages());
-
          l = RandomUtil.randomLong();
          conf.setServerDumpInterval(l);
          Assert.assertEquals(l, conf.getServerDumpInterval());
@@ -434,10 +429,6 @@ public class ConfigurationImplTest extends ActiveMQTestBase {
       conf.setLogJournalWriteRate(b);
       Assert.assertEquals(b, conf.isLogJournalWriteRate());
 
-      i = RandomUtil.randomInt();
-      conf.setJournalPerfBlastPages(i);
-      Assert.assertEquals(i, conf.getJournalPerfBlastPages());
-
       l = RandomUtil.randomLong();
       conf.setServerDumpInterval(l);
       Assert.assertEquals(l, conf.getServerDumpInterval());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/test/java/org/apache/activemq/artemis/core/filter/impl/FilterTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/filter/impl/FilterTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/filter/impl/FilterTest.java
index d73accd..1eb749b 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/filter/impl/FilterTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/filter/impl/FilterTest.java
@@ -18,10 +18,10 @@ package org.apache.activemq.artemis.core.filter.impl;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQInvalidFilterExpressionException;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.filter.Filter;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.tests.util.SilentTestCase;
 import org.apache.activemq.artemis.utils.RandomUtil;
 import org.junit.Assert;
@@ -35,13 +35,13 @@ public class FilterTest extends SilentTestCase {
 
    private Filter filter;
 
-   private ServerMessage message;
+   private Message message;
 
    @Override
    @Before
    public void setUp() throws Exception {
       super.setUp();
-      message = new ServerMessageImpl(1, 1000);
+      message = new CoreMessage().initBuffer(1024).setMessageID(1);
    }
 
    @Test
@@ -59,7 +59,7 @@ public class FilterTest extends SilentTestCase {
 
       message.putStringProperty(new SimpleString("color"), new SimpleString("RED"));
       Assert.assertTrue(filter.match(message));
-      message = new ServerMessageImpl();
+      message = new CoreMessage();
       Assert.assertFalse(filter.match(message));
    }
 
@@ -94,7 +94,7 @@ public class FilterTest extends SilentTestCase {
 
       filter = FilterImpl.createFilter(new SimpleString("AMQDurable='NON_DURABLE'"));
 
-      message = new ServerMessageImpl();
+      message = new CoreMessage();
       message.setDurable(true);
 
       Assert.assertFalse(filter.match(message));

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java
index 0e9a3f2..92204ce 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.management.ManagementHelper;
@@ -44,7 +45,7 @@ import org.apache.activemq.artemis.core.server.Divert;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueFactory;
 import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.core.server.ServerMessage;
+
 import org.apache.activemq.artemis.core.server.cluster.Bridge;
 import org.apache.activemq.artemis.core.server.cluster.BroadcastGroup;
 import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
@@ -329,7 +330,7 @@ public class ClusteredResetMockTest extends ActiveMQTestBase {
       }
 
       @Override
-      public ServerMessage handleMessage(ServerMessage message) throws Exception {
+      public Message handleMessage(Message message) throws Exception {
          return null;
       }