You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2020/02/12 18:36:34 UTC

[activemq-artemis] 01/03: ARTEMIS-2617 use core pools to reduce GC on journal loading

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 5897909dc903cf344e218b976f5593395e43336c
Author: Francesco Nigro <ni...@gmail.com>
AuthorDate: Fri Feb 7 13:47:02 2020 +0100

    ARTEMIS-2617 use core pools to reduce GC on journal loading
---
 .../artemis/cli/commands/etc/artemis.profile       |  2 +-
 .../artemis/cli/commands/etc/artemis.profile.cmd   |  2 +-
 .../activemq/artemis/api/core/SimpleString.java    |  8 +++--
 .../artemis/core/persistence/Persister.java        | 18 ++++++-----
 .../org/apache/activemq/artemis/utils/UUID.java    |  2 ++
 .../artemis/utils/collections/TypedProperties.java | 36 ++++++++++++++--------
 .../apache/activemq/artemis/api/core/Message.java  |  4 +--
 .../artemis/core/message/impl/CoreMessage.java     | 20 +++++++-----
 .../core/message/impl/CoreMessageObjectPools.java  | 26 +++++++++++++---
 .../core/message/impl/CoreMessagePersister.java    | 16 ++++++----
 .../core/message/impl/MessageInternalImpl.java     |  4 +--
 .../artemis/core/journal/EncoderPersister.java     |  2 +-
 .../artemis/protocol/amqp/broker/AMQPMessage.java  |  4 +--
 .../protocol/amqp/broker/AMQPMessagePersister.java | 14 ++++++---
 .../amqp/broker/AMQPMessagePersisterV2.java        | 21 +++++++++----
 .../amqp/broker/ProtonProtocolManagerFactory.java  |  3 +-
 .../protocol/amqp/broker/AMQPMessageTest.java      |  2 +-
 .../core/protocol/openwire/OpenwireMessage.java    |  4 +--
 .../journal/AbstractJournalStorageManager.java     | 25 +++++++++++++--
 .../impl/journal/codec/LargeMessagePersister.java  |  2 +-
 .../core/impl/CoreProtocolManagerFactory.java      |  3 +-
 .../spi/core/protocol/MessagePersister.java        | 15 ++++-----
 .../spi/core/protocol/ProtocolManagerFactory.java  |  3 +-
 .../server/impl/ScheduledDeliveryHandlerTest.java  |  4 +--
 .../tests/integration/client/AcknowledgeTest.java  |  4 +--
 .../replication/SharedNothingReplicationTest.java  |  7 +++--
 26 files changed, 168 insertions(+), 83 deletions(-)

diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile
index b50cf28..2ce58f2 100644
--- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile
+++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile
@@ -33,7 +33,7 @@ ARTEMIS_INSTANCE_ETC_URI='${artemis.instance.etc.uri}'
 
 # Java Opts
 if [ -z "$JAVA_ARGS" ]; then
-    JAVA_ARGS="${java-opts} -XX:+PrintClassHistogram -XX:+UseG1GC -Xms512M -Xmx2G -Dhawtio.realm=activemq  -Dhawtio.offline=true -Dhawtio.role=${role} -Dhawtio.rolePrincipalClasses=org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal -Djolokia.policyLocation=${ARTEMIS_INSTANCE_ETC_URI}jolokia-access.xml"
+    JAVA_ARGS="${java-opts} -XX:+PrintClassHistogram -XX:+UseG1GC -XX:+UseStringDeduplication -Xms512M -Xmx2G -Dhawtio.realm=activemq  -Dhawtio.offline=true -Dhawtio.role=${role} -Dhawtio.rolePrincipalClasses=org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal -Djolokia.policyLocation=${ARTEMIS_INSTANCE_ETC_URI}jolokia-access.xml"
 fi
 
 #
diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile.cmd b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile.cmd
index 6778eba..f653b27 100644
--- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile.cmd
+++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile.cmd
@@ -33,7 +33,7 @@ rem Cluster Properties: Used to pass arguments to ActiveMQ Artemis which can be
 rem set ARTEMIS_CLUSTER_PROPS=-Dactivemq.remoting.default.port=61617 -Dactivemq.remoting.amqp.port=5673 -Dactivemq.remoting.stomp.port=61614 -Dactivemq.remoting.hornetq.port=5446
 
 rem Java Opts
-IF "%JAVA_ARGS%"=="" (set JAVA_ARGS=${java-opts} -XX:+PrintClassHistogram -XX:+UseG1GC -Xms512M -Xmx1024M -Xbootclasspath/a:%ARTEMIS_HOME%\lib\${logmanager};%ARTEMIS_HOME%\lib\${wildfly-common} -Djava.security.auth.login.config=%ARTEMIS_ETC_DIR%\login.config -Dhawtio.offline=true -Dhawtio.realm=activemq -Dhawtio.role=${role} -Dhawtio.rolePrincipalClasses=org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal -Djolokia.policyLocation=%ARTEMIS_INSTANCE_ETC_URI%\jolokia-access.xml [...]
+IF "%JAVA_ARGS%"=="" (set JAVA_ARGS=${java-opts} -XX:+PrintClassHistogram  -XX:+UseG1GC -XX:+UseStringDeduplication -Xms512M -Xmx1024M -Xbootclasspath/a:%ARTEMIS_HOME%\lib\${logmanager};%ARTEMIS_HOME%\lib\${wildfly-common} -Djava.security.auth.login.config=%ARTEMIS_ETC_DIR%\login.config -Dhawtio.offline=true -Dhawtio.realm=activemq -Dhawtio.role=${role} -Dhawtio.rolePrincipalClasses=org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal -Djolokia.policyLocation=%ARTEMIS_INSTANC [...]
 
 rem Logs Safepoints JVM pauses: Uncomment to enable them
 rem In addition to the traditional GC logs you could enable some JVM flags to know any meaningful and "hidden" pause that could
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
index 7767fdd..91d884b 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
@@ -560,12 +560,16 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
 
    public static final class ByteBufSimpleStringPool extends AbstractByteBufPool<SimpleString> {
 
-      private static final int UUID_LENGTH = 36;
+      public static final int DEFAULT_MAX_LENGTH = 36;
 
       private final int maxLength;
 
       public ByteBufSimpleStringPool() {
-         this.maxLength = UUID_LENGTH;
+         this.maxLength = DEFAULT_MAX_LENGTH;
+      }
+
+      public ByteBufSimpleStringPool(final int capacity) {
+         this(capacity, DEFAULT_MAX_LENGTH);
       }
 
       public ByteBufSimpleStringPool(final int capacity, final int maxCharsLength) {
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/Persister.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/Persister.java
index 124dfcf..bf8df3f 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/Persister.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/Persister.java
@@ -19,20 +19,22 @@ package org.apache.activemq.artemis.core.persistence;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 
-public interface Persister<T extends Object> {
-
-   /** This is to be used to store the protocol-id on Messages.
-    *  Messages are stored on their bare format.
-    *  The protocol manager will be responsible to code or decode messages.
-    *  The caveat here is that the first short-sized bytes need to be this constant. */
+public interface Persister<T extends Object, A> {
+
+   /**
+    * This is to be used to store the protocol-id on Messages.
+    * Messages are stored on their bare format.
+    * The protocol manager will be responsible to code or decode messages.
+    * The caveat here is that the first short-sized bytes need to be this constant.
+    */
    default byte getID() {
-      return (byte)0;
+      return (byte) 0;
    }
 
    int getEncodeSize(T record);
 
    void encode(ActiveMQBuffer buffer, T record);
 
-   T decode(ActiveMQBuffer buffer, T record);
+   T decode(ActiveMQBuffer buffer, A arg);
 
 }
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUID.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUID.java
index 7d8e984..64ff432 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUID.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUID.java
@@ -98,6 +98,7 @@ public final class UUID {
     * @param data 16 byte UUID contents
     */
    public UUID(final int type, final byte[] data) {
+      assert data.length == 16;
       mId = data;
       // Type is multiplexed with time_hi:
       mId[UUID.INDEX_TYPE] &= (byte) 0x0F;
@@ -108,6 +109,7 @@ public final class UUID {
    }
 
    private UUID(final byte[] data) {
+      assert data.length == 16;
       mId = data;
    }
 
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
index 223da17..df81c28 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
@@ -446,17 +446,21 @@ public class TypedProperties {
    }
 
    public synchronized void decode(final ByteBuf buffer,
-                                   final TypedPropertiesDecoderPools keyValuePools) {
+                                   final TypedPropertiesDecoderPools keyValuePools,
+                                   boolean replaceExisting) {
       byte b = buffer.readByte();
       if (b == DataConstants.NULL) {
-         properties = null;
-         size = 0;
+         if (replaceExisting) {
+            properties = null;
+            size = 0;
+         }
       } else {
          int numHeaders = buffer.readInt();
-
-         //optimize the case of no collisions to avoid any resize (it doubles the map size!!!) when load factor is reached
-         properties = new HashMap<>(numHeaders, 1.0f);
-         size = 0;
+         if (replaceExisting || properties == null) {
+            //optimize the case of no collisions to avoid any resize (it doubles the map size!!!) when load factor is reached
+            properties = new HashMap<>(numHeaders, 1.0f);
+         }
+         size = properties.size();
 
          for (int i = 0; i < numHeaders; i++) {
             final SimpleString key = SimpleString.readSimpleString(buffer, keyValuePools == null ? null : keyValuePools.getPropertyKeysPool());
@@ -529,6 +533,10 @@ public class TypedProperties {
       }
    }
 
+   public synchronized void decode(final ByteBuf buffer, final TypedPropertiesDecoderPools keyValuePools) {
+      decode(buffer, keyValuePools, true);
+   }
+
    public void decode(final ByteBuf buffer) {
       decode(buffer, null);
    }
@@ -1029,12 +1037,16 @@ public class TypedProperties {
 
       public static final class ByteBufStringValuePool extends AbstractByteBufPool<StringValue> {
 
-         private static final int UUID_LENGTH = 36;
+         public static final int DEFAULT_MAX_LENGTH = 36;
 
          private final int maxLength;
 
          public ByteBufStringValuePool() {
-            this.maxLength = UUID_LENGTH;
+            this.maxLength = DEFAULT_MAX_LENGTH;
+         }
+
+         public ByteBufStringValuePool(final int capacity) {
+            this(capacity, DEFAULT_MAX_LENGTH);
          }
 
          public ByteBufStringValuePool(final int capacity, final int maxCharsLength) {
@@ -1074,9 +1086,9 @@ public class TypedProperties {
          this.propertyValuesPool = new TypedProperties.StringValue.ByteBufStringValuePool();
       }
 
-      public TypedPropertiesDecoderPools(int keyPoolCapacity, int valuePoolCapacity, int maxCharsLength) {
-         this.propertyKeysPool = new SimpleString.ByteBufSimpleStringPool(keyPoolCapacity, maxCharsLength);
-         this.propertyValuesPool = new TypedProperties.StringValue.ByteBufStringValuePool(valuePoolCapacity, maxCharsLength);
+      public TypedPropertiesDecoderPools(int keyPoolCapacity, int valuePoolCapacity) {
+         this.propertyKeysPool = new SimpleString.ByteBufSimpleStringPool(keyPoolCapacity);
+         this.propertyValuesPool = new TypedProperties.StringValue.ByteBufStringValuePool(valuePoolCapacity);
       }
 
       public SimpleString.ByteBufSimpleStringPool getPropertyKeysPool() {
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index 1de8150..09c2d39 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -394,7 +394,7 @@ public interface Message {
     */
    Message setDurable(boolean durable);
 
-   Persister<Message> getPersister();
+   Persister<Message, CoreMessageObjectPools> getPersister();
 
    String getAddress();
 
@@ -454,7 +454,7 @@ public interface Message {
 
    void persist(ActiveMQBuffer targetRecord);
 
-   void reloadPersistence(ActiveMQBuffer record);
+   void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools);
 
    default void releaseBuffer() {
       ByteBuf buffer = getBuffer();
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index 9050550..b61b27e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -130,7 +130,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
    }
 
    @Override
-   public Persister<Message> getPersister() {
+   public Persister<Message, CoreMessageObjectPools> getPersister() {
       return CoreMessagePersister.getInstance();
    }
 
@@ -646,11 +646,15 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
    }
 
    private void decode(boolean beforeAddress) {
+      decode(beforeAddress, coreMessageObjectPools);
+   }
+
+   private void decode(boolean beforeAddress, CoreMessageObjectPools pools) {
       endOfBodyPosition = buffer.readInt();
 
       buffer.skipBytes(endOfBodyPosition - BUFFER_HEADER_SPACE);
 
-      decodeHeadersAndProperties(buffer, true);
+      decodeHeadersAndProperties(buffer, true, pools);
       buffer.readerIndex(0);
       validBuffer = true;
 
@@ -662,14 +666,14 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
    }
 
    public void decodeHeadersAndProperties(final ByteBuf buffer) {
-      decodeHeadersAndProperties(buffer, false);
+      decodeHeadersAndProperties(buffer, false, coreMessageObjectPools);
    }
 
-   private void decodeHeadersAndProperties(final ByteBuf buffer, boolean lazyProperties) {
+   private void decodeHeadersAndProperties(final ByteBuf buffer, boolean lazyProperties, CoreMessageObjectPools pools) {
       messageIDPosition = buffer.readerIndex();
       messageID = buffer.readLong();
 
-      address = SimpleString.readNullableSimpleString(buffer, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressDecoderPool());
+      address = SimpleString.readNullableSimpleString(buffer, pools == null ? null : pools.getAddressDecoderPool());
       if (buffer.readByte() == DataConstants.NOT_NULL) {
          byte[] bytes = new byte[16];
          buffer.readBytes(bytes);
@@ -687,7 +691,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
          propertiesLocation = buffer.readerIndex();
       } else {
          properties = new TypedProperties(INTERNAL_PROPERTY_NAMES_PREDICATE);
-         properties.decode(buffer, coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesDecoderPools());
+         properties.decode(buffer, pools == null ? null : pools.getPropertiesDecoderPools());
       }
    }
 
@@ -1180,11 +1184,11 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
    }
 
    @Override
-   public void reloadPersistence(ActiveMQBuffer record) {
+   public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools) {
       int size = record.readInt();
       initBuffer(size);
       buffer.setIndex(0, 0).writeBytes(record.byteBuf(), size);
-      decode(false);
+      decode(false, pools);
    }
 
    @Override
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java
index 4c56eac..62ee5ed 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java
@@ -25,14 +25,30 @@ import java.util.function.Supplier;
 
 public class CoreMessageObjectPools {
 
-   private Supplier<SimpleString.ByteBufSimpleStringPool> addressDecoderPool = Suppliers.memoize(SimpleString.ByteBufSimpleStringPool::new);
-   private Supplier<TypedProperties.TypedPropertiesDecoderPools> propertiesDecoderPools = Suppliers.memoize(TypedProperties.TypedPropertiesDecoderPools::new);
+   private final Supplier<SimpleString.ByteBufSimpleStringPool> addressDecoderPool;
+   private final Supplier<TypedProperties.TypedPropertiesDecoderPools> propertiesDecoderPools;
 
-   private Supplier<SimpleString.StringSimpleStringPool> groupIdStringSimpleStringPool = Suppliers.memoize(SimpleString.StringSimpleStringPool::new);
-   private Supplier<SimpleString.StringSimpleStringPool> addressStringSimpleStringPool = Suppliers.memoize(SimpleString.StringSimpleStringPool::new);
-   private Supplier<TypedProperties.TypedPropertiesStringSimpleStringPools> propertiesStringSimpleStringPools = Suppliers.memoize(TypedProperties.TypedPropertiesStringSimpleStringPools::new);
+   private final Supplier<SimpleString.StringSimpleStringPool> groupIdStringSimpleStringPool;
+   private final Supplier<SimpleString.StringSimpleStringPool> addressStringSimpleStringPool;
+   private final Supplier<TypedProperties.TypedPropertiesStringSimpleStringPools> propertiesStringSimpleStringPools;
+
+   public CoreMessageObjectPools(int addressPoolCapacity,
+                                 int groupIdCapacity,
+                                 int propertyKeyCapacity,
+                                 int propertyValueCapacity) {
+      addressDecoderPool = Suppliers.memoize(() -> new SimpleString.ByteBufSimpleStringPool(addressPoolCapacity));
+      propertiesDecoderPools = Suppliers.memoize(() -> new TypedProperties.TypedPropertiesDecoderPools(propertyKeyCapacity, propertyValueCapacity));
+      groupIdStringSimpleStringPool = Suppliers.memoize(() -> new SimpleString.StringSimpleStringPool(groupIdCapacity));
+      addressStringSimpleStringPool = Suppliers.memoize(() -> new SimpleString.StringSimpleStringPool(addressPoolCapacity));
+      propertiesStringSimpleStringPools = Suppliers.memoize(() -> new TypedProperties.TypedPropertiesStringSimpleStringPools(propertyKeyCapacity, propertyValueCapacity));
+   }
 
    public CoreMessageObjectPools() {
+      addressDecoderPool = Suppliers.memoize(SimpleString.ByteBufSimpleStringPool::new);
+      propertiesDecoderPools = Suppliers.memoize(TypedProperties.TypedPropertiesDecoderPools::new);
+      groupIdStringSimpleStringPool = Suppliers.memoize(SimpleString.StringSimpleStringPool::new);
+      addressStringSimpleStringPool = Suppliers.memoize(SimpleString.StringSimpleStringPool::new);
+      propertiesStringSimpleStringPools = Suppliers.memoize(TypedProperties.TypedPropertiesStringSimpleStringPools::new);
    }
 
    public SimpleString.ByteBufSimpleStringPool getAddressDecoderPool() {
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessagePersister.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessagePersister.java
index cbd565d..3861d67 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessagePersister.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessagePersister.java
@@ -23,7 +23,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.utils.DataConstants;
 
-public class CoreMessagePersister implements Persister<Message> {
+public class CoreMessagePersister implements Persister<Message, CoreMessageObjectPools> {
    public static final byte ID = 1;
 
    private static CoreMessagePersister theInstance;
@@ -68,14 +68,18 @@ public class CoreMessagePersister implements Persister<Message> {
       record.persist(buffer);
    }
 
-
    @Override
-   public Message decode(ActiveMQBuffer buffer, Message record) {
+   public Message decode(ActiveMQBuffer buffer, CoreMessageObjectPools pool) {
       // the caller must consume the first byte already, as that will be used to decide what persister (protocol) to use
       long id = buffer.readLong();
-      SimpleString address = buffer.readNullableSimpleString();
-      record = new CoreMessage();
-      record.reloadPersistence(buffer);
+      final SimpleString address;
+      if (pool == null) {
+         address = buffer.readNullableSimpleString();
+      } else {
+         address = SimpleString.readNullableSimpleString(buffer.byteBuf(), pool.getAddressDecoderPool());
+      }
+      CoreMessage record = new CoreMessage();
+      record.reloadPersistence(buffer, pool);
       record.setMessageID(id);
       record.setAddress(address);
       return record;
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java
index e1dcf97..0f809ad 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java
@@ -248,7 +248,7 @@ public class MessageInternalImpl implements MessageInternal {
    }
 
    @Override
-   public Persister<Message> getPersister() {
+   public Persister<Message, CoreMessageObjectPools> getPersister() {
       throw new UnsupportedOperationException();
    }
 
@@ -340,7 +340,7 @@ public class MessageInternalImpl implements MessageInternal {
    }
 
    @Override
-   public void reloadPersistence(ActiveMQBuffer record) {
+   public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools) {
       throw new UnsupportedOperationException();
    }
 
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncoderPersister.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncoderPersister.java
index 8fc2a5aa..1e734d3 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncoderPersister.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncoderPersister.java
@@ -22,7 +22,7 @@ import org.apache.activemq.artemis.core.persistence.Persister;
 
 /** This is a facade between the new Persister and the former EncodingSupport.
  *  Methods using the old interface will use this as a facade to provide the previous semantic. */
-public class EncoderPersister implements Persister<EncodingSupport> {
+public class EncoderPersister implements Persister<EncodingSupport, EncodingSupport> {
 
    private static final EncoderPersister theInstance = new EncoderPersister();
 
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index fcfe10e..be758de 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -741,7 +741,7 @@ public class AMQPMessage extends RefCountMessage {
    }
 
    @Override
-   public void reloadPersistence(ActiveMQBuffer record) {
+   public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools) {
       int size = record.readInt();
       byte[] recordArray = new byte[size];
       record.readBytes(recordArray);
@@ -771,7 +771,7 @@ public class AMQPMessage extends RefCountMessage {
    }
 
    @Override
-   public Persister<org.apache.activemq.artemis.api.core.Message> getPersister() {
+   public Persister<org.apache.activemq.artemis.api.core.Message, CoreMessageObjectPools> getPersister() {
       return AMQPMessagePersisterV2.getInstance();
    }
 
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java
index c688124..9ab0842 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java
@@ -20,6 +20,7 @@ package org.apache.activemq.artemis.protocol.amqp.broker;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
 import org.apache.activemq.artemis.utils.DataConstants;
 
@@ -62,12 +63,17 @@ public class AMQPMessagePersister extends MessagePersister {
    }
 
    @Override
-   public Message decode(ActiveMQBuffer buffer, Message record) {
+   public Message decode(ActiveMQBuffer buffer, CoreMessageObjectPools pool) {
       long id = buffer.readLong();
       long format = buffer.readLong();
-      SimpleString address = buffer.readNullableSimpleString();
-      record = new AMQPMessage(format);
-      record.reloadPersistence(buffer);
+      final SimpleString address;
+      if (pool == null) {
+         address = buffer.readNullableSimpleString();
+      } else {
+         address = SimpleString.readNullableSimpleString(buffer.byteBuf(), pool.getAddressDecoderPool());
+      }
+      AMQPMessage record = new AMQPMessage(format);
+      record.reloadPersistence(buffer, pool);
       record.setMessageID(id);
       if (address != null) {
          record.setAddress(address);
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersisterV2.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersisterV2.java
index c268694..d263fe9 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersisterV2.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersisterV2.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.protocol.amqp.broker;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.utils.DataConstants;
 import org.apache.activemq.artemis.utils.collections.TypedProperties;
 
@@ -68,16 +69,24 @@ public class AMQPMessagePersisterV2 extends AMQPMessagePersister {
       }
    }
 
-
    @Override
-   public Message decode(ActiveMQBuffer buffer, Message record) {
-      AMQPMessage message = (AMQPMessage)super.decode(buffer, record);
+   public Message decode(ActiveMQBuffer buffer, CoreMessageObjectPools pool) {
+      AMQPMessage message = (AMQPMessage) super.decode(buffer, pool);
       int size = buffer.readInt();
 
       if (size != 0) {
-         TypedProperties properties = new TypedProperties(Message.INTERNAL_PROPERTY_NAMES_PREDICATE);
-         properties.decode(buffer.byteBuf());
-         message.setExtraProperties(properties);
+         // message::setAddress could have populated extra properties
+         // hence, we can safely replace the value on the properties
+         // if it has been encoded differently in the rest of the buffer
+         TypedProperties existingExtraProperties = message.getExtraProperties();
+         TypedProperties extraProperties = existingExtraProperties;
+         if (existingExtraProperties == null) {
+            extraProperties = new TypedProperties(Message.INTERNAL_PROPERTY_NAMES_PREDICATE);
+         }
+         extraProperties.decode(buffer.byteBuf(), pool != null ? pool.getPropertiesDecoderPools() : null, existingExtraProperties == null);
+         if (extraProperties != existingExtraProperties) {
+            message.setExtraProperties(extraProperties);
+         }
       }
       return message;
    }
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java
index cd21e46..7470d60 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java
@@ -21,6 +21,7 @@ import java.util.Map;
 
 import org.apache.activemq.artemis.api.core.BaseInterceptor;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory;
@@ -39,7 +40,7 @@ public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory
    private static String[] SUPPORTED_PROTOCOLS = {AMQP_PROTOCOL_NAME};
 
    @Override
-   public Persister<Message>[] getPersister() {
+   public Persister<Message, CoreMessageObjectPools>[] getPersister() {
 
       Persister[] persisters = new Persister[]{AMQPMessagePersister.getInstance(), AMQPMessagePersisterV2.getInstance()};
       return persisters;
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
index 88414b4..695bf0f 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
@@ -147,7 +147,7 @@ public class AMQPMessageTest {
       final long persistedSize = (long) encoded.readableBytes();
 
       // Now reload from encoded data
-      message.reloadPersistence(encoded);
+      message.reloadPersistence(encoded, null);
 
       assertEquals(persistedSize, message.getPersistSize());
       assertEquals(persistedSize - Integer.BYTES, message.getPersistentSize());
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
index 45e8953..9644b70 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
@@ -140,7 +140,7 @@ public class OpenwireMessage implements Message {
    }
 
    @Override
-   public Persister<Message> getPersister() {
+   public Persister<Message, CoreMessageObjectPools> getPersister() {
       return null;
    }
 
@@ -205,7 +205,7 @@ public class OpenwireMessage implements Message {
    }
 
    @Override
-   public void reloadPersistence(ActiveMQBuffer record) {
+   public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools) {
 
    }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index effb7e1..296b221 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.artemis.core.persistence.impl.journal;
 
+import static org.apache.activemq.artemis.api.core.SimpleString.ByteBufSimpleStringPool.DEFAULT_MAX_LENGTH;
+import static org.apache.activemq.artemis.api.core.SimpleString.ByteBufSimpleStringPool.DEFAULT_POOL_CAPACITY;
 import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_CURSOR;
 import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE_PENDING;
 import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.DUPLICATE_ID;
@@ -56,6 +58,7 @@ import org.apache.activemq.artemis.core.journal.Journal;
 import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
 import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
 import org.apache.activemq.artemis.core.paging.PagingManager;
 import org.apache.activemq.artemis.core.paging.PagingStore;
@@ -858,6 +861,19 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
          }
 
          final MutableLong recordNumber = new MutableLong();
+         final CoreMessageObjectPools pools;
+         if (totalSize > 0) {
+            final int addresses = (int)Math.max(
+               DEFAULT_POOL_CAPACITY,
+               queueInfos == null ? 0 :
+                  queueInfos.values().stream()
+                     .map(QueueBindingInfo::getAddress)
+                     .filter(addr -> addr.length() <= DEFAULT_MAX_LENGTH)
+                     .count() * 2);
+            pools = new CoreMessageObjectPools(addresses, DEFAULT_POOL_CAPACITY, 128, 128);
+         } else {
+            pools = null;
+         }
          // This will free up memory sooner while reading the records
          records.clear(record -> {
             try {
@@ -904,7 +920,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
 
                   case JournalRecordIds.ADD_MESSAGE_PROTOCOL: {
 
-                     Message message = MessagePersister.getInstance().decode(buff, null);
+                     Message message = MessagePersister.getInstance().decode(buff, pools);
 
                      messages.put(record.id, message);
 
@@ -1716,6 +1732,8 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
                                          final Set<Pair<Long, Long>> pendingLargeMessages,
                                          JournalLoader journalLoader) throws Exception {
       // recover prepared transactions
+      CoreMessageObjectPools pools = null;
+
       for (PreparedTransactionInfo preparedTransaction : preparedTransactions) {
          XidEncoding encodingXid = new XidEncoding(preparedTransaction.getExtraData());
 
@@ -1749,7 +1767,10 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
                   break;
                }
                case JournalRecordIds.ADD_MESSAGE_PROTOCOL: {
-                  Message message = MessagePersister.getInstance().decode(buff, null);
+                  if (pools == null) {
+                     pools = new CoreMessageObjectPools();
+                  }
+                  Message message = MessagePersister.getInstance().decode(buff, pools);
 
                   messages.put(record.id, message);
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/LargeMessagePersister.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/LargeMessagePersister.java
index b715f97..4e02f68 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/LargeMessagePersister.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/LargeMessagePersister.java
@@ -22,7 +22,7 @@ import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
 
-public class LargeMessagePersister implements Persister<LargeServerMessage> {
+public class LargeMessagePersister implements Persister<LargeServerMessage, LargeServerMessage> {
 
    private static final LargeMessagePersister theInstance = new LargeMessagePersister();
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java
index 7590924..6d5e352 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java
@@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.core.BaseInterceptor;
 import org.apache.activemq.artemis.api.core.Interceptor;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister;
 import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -37,7 +38,7 @@ public class CoreProtocolManagerFactory extends AbstractProtocolManagerFactory<I
    private static final String MODULE_NAME = "artemis-server";
 
    @Override
-   public Persister<Message>[] getPersister() {
+   public Persister<Message, CoreMessageObjectPools>[] getPersister() {
       return new Persister[]{CoreMessagePersister.getInstance()};
    }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java
index ad1317f..2fddc56 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java
@@ -21,11 +21,12 @@ import java.util.ServiceLoader;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister;
 import org.apache.activemq.artemis.core.persistence.Persister;
 import org.jboss.logging.Logger;
 
-public class MessagePersister implements Persister<Message> {
+public class MessagePersister implements Persister<Message, CoreMessageObjectPools> {
 
    private static final Logger logger = Logger.getLogger(MessagePersister.class);
 
@@ -33,7 +34,7 @@ public class MessagePersister implements Persister<Message> {
 
    /** This will be used for reading messages */
    private static final int MAX_PERSISTERS = 3;
-   private static final Persister<Message>[] persisters = new Persister[MAX_PERSISTERS];
+   private static final Persister<Message, CoreMessageObjectPools>[] persisters = new Persister[MAX_PERSISTERS];
 
    static {
       CoreMessagePersister persister = CoreMessagePersister.getInstance();
@@ -46,7 +47,7 @@ public class MessagePersister implements Persister<Message> {
    }
 
    public static void registerProtocol(ProtocolManagerFactory manager) {
-      Persister<Message>[] messagePersisters = manager.getPersister();
+      Persister<Message, CoreMessageObjectPools>[] messagePersisters = manager.getPersister();
       if (messagePersisters == null || messagePersisters.length == 0) {
          logger.debug("Cannot find persister for " + manager);
       } else {
@@ -69,7 +70,7 @@ public class MessagePersister implements Persister<Message> {
       return persisters[id - 1];
    }
 
-   public static void registerPersister(Persister<Message> persister) {
+   public static void registerPersister(Persister<Message, CoreMessageObjectPools> persister) {
       if (persister != null) {
          assert persister.getID() <= MAX_PERSISTERS : "You must update MessagePersister::MAX_PERSISTERS to a higher number";
          persisters[persister.getID() - 1] = persister;
@@ -97,12 +98,12 @@ public class MessagePersister implements Persister<Message> {
    }
 
    @Override
-   public Message decode(ActiveMQBuffer buffer, Message record) {
+   public Message decode(ActiveMQBuffer buffer, CoreMessageObjectPools pools) {
       byte protocol = buffer.readByte();
-      Persister<Message> persister = getPersister(protocol);
+      Persister<Message, CoreMessageObjectPools> persister = getPersister(protocol);
       if (persister == null) {
          throw new NullPointerException("couldn't find factory for type=" + protocol);
       }
-      return persister.decode(buffer, record);
+      return persister.decode(buffer, pools);
    }
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java
index 4ab34eb..77c66f9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java
@@ -21,12 +21,13 @@ import java.util.Map;
 
 import org.apache.activemq.artemis.api.core.BaseInterceptor;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 
 public interface ProtocolManagerFactory<P extends BaseInterceptor> {
 
-   default Persister<Message>[] getPersister() {
+   default Persister<Message, CoreMessageObjectPools>[] getPersister() {
       return new Persister[]{};
    }
 
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index b3ae240..d0ed6f8 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -323,12 +323,12 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
-      public void reloadPersistence(ActiveMQBuffer record) {
+      public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools) {
 
       }
 
       @Override
-      public Persister<Message> getPersister() {
+      public Persister<Message, CoreMessageObjectPools> getPersister() {
          return null;
       }
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
index 511d476..dc23bc9 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
@@ -371,12 +371,12 @@ public class AcknowledgeTest extends ActiveMQTestBase {
       }
 
       @Override
-      public Persister<Message> getPersister() {
+      public Persister<Message, CoreMessageObjectPools> getPersister() {
          return null;
       }
 
       @Override
-      public void reloadPersistence(ActiveMQBuffer record) {
+      public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools) {
 
       }
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationTest.java
index 7cd4b05..f05552b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationTest.java
@@ -39,6 +39,7 @@ import org.apache.activemq.artemis.core.journal.LoaderCallback;
 import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
 import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister;
 import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
@@ -294,7 +295,7 @@ public class SharedNothingReplicationTest extends ActiveMQTestBase {
       return conf;
    }
 
-   static class SlowMessagePersister extends CoreMessagePersister implements Persister<Message> {
+   static class SlowMessagePersister extends CoreMessagePersister implements Persister<Message, CoreMessageObjectPools> {
 
       boolean used = false;
 
@@ -343,8 +344,8 @@ public class SharedNothingReplicationTest extends ActiveMQTestBase {
       }
 
       @Override
-      public Message decode(ActiveMQBuffer buffer, Message record) {
-         return persister.decode(buffer, record);
+      public Message decode(ActiveMQBuffer buffer, CoreMessageObjectPools pool) {
+         return persister.decode(buffer, pool);
       }
    }