You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2020/02/12 18:36:33 UTC

[activemq-artemis] branch master updated (0b8c33b -> 69779ee)

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git.


    from 0b8c33b  NO-JIRA: Adding missing module to broker-federation pom
     new 5897909  ARTEMIS-2617 use core pools to reduce GC on journal loading
     new d42267f  ARTEMIS-2617 Lazy scan AMQP message data
     new 69779ee  This closes #2975

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../artemis/cli/commands/etc/artemis.profile       |   2 +-
 .../artemis/cli/commands/etc/artemis.profile.cmd   |   2 +-
 .../activemq/artemis/api/core/SimpleString.java    |   8 +-
 .../artemis/core/persistence/Persister.java        |  18 +--
 .../org/apache/activemq/artemis/utils/UUID.java    |   2 +
 .../activemq/artemis/utils/algo/KMPNeedle.java     | 103 ++++++++++++++++
 .../artemis/utils/collections/TypedProperties.java |  36 ++++--
 .../apache/activemq/artemis/api/core/Message.java  |   4 +-
 .../artemis/core/message/impl/CoreMessage.java     |  20 ++--
 .../core/message/impl/CoreMessageObjectPools.java  |  26 ++++-
 .../core/message/impl/CoreMessagePersister.java    |  16 ++-
 .../core/message/impl/MessageInternalImpl.java     |   4 +-
 .../artemis/core/journal/EncoderPersister.java     |   2 +-
 .../artemis/protocol/amqp/broker/AMQPMessage.java  | 129 ++++++++++++++++++---
 .../protocol/amqp/broker/AMQPMessagePersister.java |  14 ++-
 .../amqp/broker/AMQPMessagePersisterV2.java        |  21 +++-
 .../amqp/broker/AMQPMessageSymbolSearch.java       |  87 ++++++++++++++
 .../amqp/broker/ProtonProtocolManagerFactory.java  |   3 +-
 .../protocol/amqp/broker/AMQPMessageTest.java      |  73 +++++++++++-
 .../core/protocol/openwire/OpenwireMessage.java    |   4 +-
 .../journal/AbstractJournalStorageManager.java     |  25 +++-
 .../impl/journal/codec/LargeMessagePersister.java  |   2 +-
 .../core/impl/CoreProtocolManagerFactory.java      |   3 +-
 .../spi/core/protocol/MessagePersister.java        |  15 +--
 .../spi/core/protocol/ProtocolManagerFactory.java  |   3 +-
 .../server/impl/ScheduledDeliveryHandlerTest.java  |   4 +-
 .../tests/integration/client/AcknowledgeTest.java  |   4 +-
 .../replication/SharedNothingReplicationTest.java  |   7 +-
 28 files changed, 537 insertions(+), 100 deletions(-)
 create mode 100644 artemis-commons/src/main/java/org/apache/activemq/artemis/utils/algo/KMPNeedle.java
 create mode 100644 artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageSymbolSearch.java


[activemq-artemis] 02/03: ARTEMIS-2617 Lazy scan AMQP message data

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit d42267f05a670292c4f8e20a70471a95ed8ff5bc
Author: Francesco Nigro <ni...@gmail.com>
AuthorDate: Sun Feb 9 12:54:14 2020 +0100

    ARTEMIS-2617 Lazy scan AMQP message data
---
 .../activemq/artemis/utils/algo/KMPNeedle.java     | 103 +++++++++++++++++
 .../artemis/protocol/amqp/broker/AMQPMessage.java  | 125 ++++++++++++++++++---
 .../amqp/broker/AMQPMessageSymbolSearch.java       |  87 ++++++++++++++
 .../protocol/amqp/broker/AMQPMessageTest.java      |  71 +++++++++++-
 4 files changed, 369 insertions(+), 17 deletions(-)

diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/algo/KMPNeedle.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/algo/KMPNeedle.java
new file mode 100644
index 0000000..22910c8
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/algo/KMPNeedle.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.algo;
+
+import java.util.Objects;
+
+/**
+ * Abstraction of {@code byte[] }<a href="https://en.wikipedia.org/wiki/Knuth%E2%80%93Morris%E2%80%93Pratt_algorithm">Knuth-Morris-Pratt</a>'s needle to be used
+ * to perform pattern matching over indexed haystack of {@code byte}s.
+ */
+public final class KMPNeedle {
+
+   @FunctionalInterface
+   public interface IndexedByteSupplier<S> {
+
+      byte get(S source, int index);
+   }
+
+   private final int[] jumpTable;
+   private final byte[] needle;
+
+   private KMPNeedle(byte[] needle) {
+      Objects.requireNonNull(needle);
+      this.needle = needle;
+      this.jumpTable = createJumpTable(needle);
+   }
+
+   private static int[] createJumpTable(byte[] needle) {
+      final int[] jumpTable = new int[needle.length + 1];
+      int j = 0;
+      for (int i = 1; i < needle.length; i++) {
+         while (j > 0 && needle[j] != needle[i]) {
+            j = jumpTable[j];
+         }
+         if (needle[j] == needle[i]) {
+            j++;
+         }
+         jumpTable[i + 1] = j;
+      }
+      for (int i = 1; i < jumpTable.length; i++) {
+         if (jumpTable[i] != 0) {
+            return jumpTable;
+         }
+      }
+      // optimization over the original algorithm: it would save from accessing any jump table
+      return null;
+   }
+
+   /**
+    * https://en.wikipedia.org/wiki/Knuth%E2%80%93Morris%E2%80%93Pratt_algorithm search algorithm:
+    *
+    * This version differ from the original algorithm, because allows to fail fast (and faster) if
+    * the remaining haystack to be processed is < of the remaining needle to be matched.
+    */
+   public <H> int searchInto(IndexedByteSupplier<? super H> haystackReader, H haystack, int end, int start) {
+      assert end >= 0 && start >= 0 && end >= start;
+      final int length = end - start;
+      int j = 0;
+      final int needleLength = needle.length;
+      int remainingNeedle = needleLength;
+      for (int i = 0; i < length; i++) {
+         final int remainingHayStack = length - i;
+         if (remainingNeedle > remainingHayStack) {
+            return -1;
+         }
+         final int index = start + i;
+         final byte value = haystackReader.get(haystack, index);
+         while (j > 0 && needle[j] != value) {
+            j = jumpTable == null ? 0 : jumpTable[j];
+            remainingNeedle = needleLength - j;
+         }
+         if (needle[j] == value) {
+            j++;
+            remainingNeedle--;
+            assert remainingNeedle >= 0;
+         }
+         if (j == needleLength) {
+            final int startMatch = index - needleLength + 1;
+            return startMatch;
+         }
+      }
+      return -1;
+   }
+
+   public static KMPNeedle of(byte[] needle) {
+      return new KMPNeedle(needle);
+   }
+
+}
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index be758de..fa96a5c 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.protocol.amqp.broker;
 
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
@@ -43,6 +44,7 @@ import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
 import org.apache.activemq.artemis.reader.MessageUtil;
 import org.apache.activemq.artemis.utils.ByteUtil;
 import org.apache.activemq.artemis.utils.DataConstants;
+import org.apache.activemq.artemis.utils.algo.KMPNeedle;
 import org.apache.activemq.artemis.utils.collections.TypedProperties;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
@@ -74,12 +76,49 @@ import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
 import org.jboss.logging.Logger;
 
-// see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format
+/**
+ * See <a href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format">AMQP v1.0 message format</a>
+ * <pre>
+ *
+ *                                                      Bare Message
+ *                                                            |
+ *                                      .---------------------+--------------------.
+ *                                      |                                          |
+ * +--------+-------------+-------------+------------+--------------+--------------+--------+
+ * | header | delivery-   | message-    | properties | application- | application- | footer |
+ * |        | annotations | annotations |            | properties   | data         |        |
+ * +--------+-------------+-------------+------------+--------------+--------------+--------+
+ * |                                                                                        |
+ * '-------------------------------------------+--------------------------------------------'
+ *                                             |
+ *                                      Annotated Message
+ * </pre>
+ * <ul>
+ *    <li>Zero or one header sections.
+ *    <li>Zero or one delivery-annotation sections.
+ *    <li>Zero or one message-annotation sections.
+ *    <li>Zero or one properties sections.
+ *    <li>Zero or one application-properties sections.
+ *    <li>The body consists of one of the following three choices:
+ *    <ul>
+ *       <li>one or more data sections
+ *       <li>one or more amqp-sequence sections
+ *       <li>or a single amqp-value section.
+ *    </ul>
+ *    <li>Zero or one footer sections.
+ * </ul>
+ */
 public class AMQPMessage extends RefCountMessage {
 
    private static final Logger logger = Logger.getLogger(AMQPMessage.class);
 
    public static final SimpleString ADDRESS_PROPERTY = SimpleString.toSimpleString("_AMQ_AD");
+   // used to perform quick search
+   private static final Symbol[] SCHEDULED_DELIVERY_SYMBOLS = new Symbol[]{
+      AMQPMessageSupport.SCHEDULED_DELIVERY_TIME, AMQPMessageSupport.SCHEDULED_DELIVERY_DELAY};
+   private static final KMPNeedle[] SCHEDULED_DELIVERY_NEEDLES = new KMPNeedle[]{
+      AMQPMessageSymbolSearch.kmpNeedleOf(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME),
+      AMQPMessageSymbolSearch.kmpNeedleOf(AMQPMessageSupport.SCHEDULED_DELIVERY_DELAY)};
 
    public static final int DEFAULT_MESSAGE_FORMAT = 0;
    public static final int DEFAULT_MESSAGE_PRIORITY = 4;
@@ -89,7 +128,10 @@ public class AMQPMessage extends RefCountMessage {
 
    // Buffer and state for the data backing this message.
    private ReadableBuffer data;
-   private boolean messageDataScanned;
+   private static final byte NOT_SCANNED = 0;
+   private static final byte RELOAD_PERSISTENCE = 1;
+   private static final byte SCANNED = 2;
+   private byte messageDataScanned;
 
    // Marks the message as needed to be re-encoded to update the backing buffer
    private boolean modified;
@@ -450,16 +492,25 @@ public class AMQPMessage extends RefCountMessage {
    // re-encode should be done to update the backing data with the in memory elements.
 
    private synchronized void ensureMessageDataScanned() {
-      if (!messageDataScanned) {
-         scanMessageData();
+      final byte state = messageDataScanned;
+      switch (state) {
+         case NOT_SCANNED:
+            scanMessageData();
+            break;
+         case RELOAD_PERSISTENCE:
+            lazyScanAfterReloadPersistence();
+            break;
+         case SCANNED:
+            // NO-OP
+            break;
+         default:
+            throw new IllegalStateException("invalid messageDataScanned state: expected within " +
+                                               Arrays.toString(new byte[]{NOT_SCANNED, SCANNED, RELOAD_PERSISTENCE}) +
+                                               " but " + messageDataScanned);
       }
    }
 
-   private synchronized void scanMessageData() {
-      this.messageDataScanned = true;
-      DecoderImpl decoder = TLSEncode.getDecoder();
-      decoder.setBuffer(data.rewind());
-
+   private synchronized void resetMessageData() {
       header = null;
       messageAnnotations = null;
       properties = null;
@@ -474,6 +525,14 @@ public class AMQPMessage extends RefCountMessage {
       propertiesPosition = VALUE_NOT_PRESENT;
       applicationPropertiesPosition = VALUE_NOT_PRESENT;
       remainingBodyPosition = VALUE_NOT_PRESENT;
+   }
+
+   private synchronized void scanMessageData() {
+      this.messageDataScanned = SCANNED;
+      DecoderImpl decoder = TLSEncode.getDecoder();
+      decoder.setBuffer(data.rewind());
+
+      resetMessageData();
 
       try {
          while (data.hasRemaining()) {
@@ -747,12 +806,16 @@ public class AMQPMessage extends RefCountMessage {
       record.readBytes(recordArray);
       data = ReadableBuffer.ByteBufferReader.wrap(ByteBuffer.wrap(recordArray));
 
-      // Message state is now that the underlying buffer is loaded but the contents
-      // not yet scanned, once done the message is fully populated and ready for dispatch.
-      // Force a scan now and tidy the state variables to reflect where we are following
-      // this reload from the store.
+      // Message state is now that the underlying buffer is loaded, but the contents not yet scanned
+      resetMessageData();
+      modified = false;
+      messageDataScanned = RELOAD_PERSISTENCE;
+   }
+
+   private synchronized void lazyScanAfterReloadPersistence() {
+      assert messageDataScanned == RELOAD_PERSISTENCE;
       scanMessageData();
-      messageDataScanned = true;
+      messageDataScanned = SCANNED;
       modified = false;
 
       // Message state should reflect that is came from persistent storage which
@@ -798,7 +861,7 @@ public class AMQPMessage extends RefCountMessage {
 
    private synchronized void encodeMessage() {
       this.modified = false;
-      this.messageDataScanned = false;
+      this.messageDataScanned = NOT_SCANNED;
       int estimated = Math.max(1500, data != null ? data.capacity() + 1000 : 0);
       ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(estimated);
       EncoderImpl encoder = TLSEncode.getEncoder();
@@ -1115,6 +1178,7 @@ public class AMQPMessage extends RefCountMessage {
 
    @Override
    public RoutingType getRoutingType() {
+      ensureMessageDataScanned();
       Object routingType = getMessageAnnotation(AMQPMessageSupport.ROUTING_TYPE);
 
       if (routingType != null) {
@@ -1184,7 +1248,38 @@ public class AMQPMessage extends RefCountMessage {
    }
 
    @Override
+   public boolean hasScheduledDeliveryTime() {
+      if (scheduledTime >= 0) {
+         return true;
+      }
+      return anyMessageAnnotations(SCHEDULED_DELIVERY_SYMBOLS, SCHEDULED_DELIVERY_NEEDLES);
+   }
+
+   private boolean anyMessageAnnotations(Symbol[] symbols, KMPNeedle[] symbolNeedles) {
+      assert symbols.length == symbolNeedles.length;
+      final int count = symbols.length;
+      if (messageDataScanned == SCANNED) {
+         final MessageAnnotations messageAnnotations = this.messageAnnotations;
+         if (messageAnnotations == null) {
+            return false;
+         }
+         Map<Symbol, Object> map = messageAnnotations.getValue();
+         if (map == null) {
+            return false;
+         }
+         for (int i = 0; i < count; i++) {
+            if (map.containsKey(symbols[i])) {
+               return true;
+            }
+         }
+         return false;
+      }
+      return AMQPMessageSymbolSearch.anyMessageAnnotations(data, symbolNeedles);
+   }
+
+   @Override
    public Long getScheduledDeliveryTime() {
+      ensureMessageDataScanned();
       if (scheduledTime < 0) {
          Object objscheduledTime = getMessageAnnotation(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME);
          Object objdelay = getMessageAnnotation(AMQPMessageSupport.SCHEDULED_DELIVERY_DELAY);
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageSymbolSearch.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageSymbolSearch.java
new file mode 100644
index 0000000..7b046a5
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageSymbolSearch.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.broker;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.IdentityHashMap;
+import java.util.List;
+
+import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
+import org.apache.activemq.artemis.utils.algo.KMPNeedle;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.amqp.messaging.Footer;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Properties;
+import org.apache.qpid.proton.codec.DecoderImpl;
+import org.apache.qpid.proton.codec.ReadableBuffer;
+import org.apache.qpid.proton.codec.TypeConstructor;
+
+final class AMQPMessageSymbolSearch {
+
+   // used to quick search for MessageAnnotations
+   private static final IdentityHashMap<Class<?>, Boolean> MSG_BODY_TYPES;
+
+   static {
+      // we're including MessageAnnotations here because it will still cause termination
+      final List<Class<?>> classList = Arrays.asList(MessageAnnotations.class, Properties.class,
+                                                     ApplicationProperties.class, Data.class,
+                                                     AmqpSequence.class, AmqpValue.class, Footer.class);
+      MSG_BODY_TYPES = new IdentityHashMap<>(classList.size());
+      classList.forEach(clazz -> MSG_BODY_TYPES.put(clazz, Boolean.TRUE));
+   }
+
+   public static KMPNeedle kmpNeedleOf(Symbol symbol) {
+      return KMPNeedle.of(symbol.toString().getBytes(StandardCharsets.US_ASCII));
+   }
+
+   public static boolean anyMessageAnnotations(ReadableBuffer data, KMPNeedle[] needles) {
+      DecoderImpl decoder = TLSEncode.getDecoder();
+      final int position = data.position();
+      decoder.setBuffer(data.rewind());
+      try {
+         while (data.hasRemaining()) {
+            TypeConstructor<?> constructor = decoder.readConstructor();
+            final Class<?> typeClass = constructor.getTypeClass();
+            if (MSG_BODY_TYPES.containsKey(typeClass)) {
+               if (MessageAnnotations.class.equals(typeClass)) {
+                  final int start = data.position();
+                  constructor.skipValue();
+                  final int end = data.position();
+                  for (int i = 0, count = needles.length; i < count; i++) {
+                     final int foundIndex = needles[i].searchInto(ReadableBuffer::get, data, end, start);
+                     if (foundIndex != -1) {
+                        return true;
+                     }
+                  }
+               }
+               return false;
+            }
+            constructor.skipValue();
+         }
+         return false;
+      } finally {
+         decoder.setBuffer(null);
+         data.position(position);
+      }
+   }
+
+}
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
index 695bf0f..0f68edf 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
@@ -31,6 +31,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -156,6 +157,72 @@ public class AMQPMessageTest {
       assertEquals(TEST_TO_ADDRESS, message.getAddress());
    }
 
+   @Test
+   public void testHasScheduledDeliveryTimeReloadPersistence() {
+      final long scheduledTime = System.currentTimeMillis();
+      MessageImpl protonMessage = createProtonMessage();
+      MessageAnnotations annotations = protonMessage.getMessageAnnotations();
+      annotations.getValue().put(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME, scheduledTime);
+      ActiveMQBuffer encoded = encodeMessageAsPersistedBuffer(protonMessage);
+
+      AMQPMessage message = new AMQPMessage(0);
+      try {
+         message.getProtonMessage();
+         fail("Should throw NPE due to not being initialized yet");
+      } catch (NullPointerException npe) {
+      }
+
+      // Now reload from encoded data
+      message.reloadPersistence(encoded, null);
+
+      assertTrue(message.hasScheduledDeliveryTime());
+      message.getHeader();
+      assertTrue(message.hasScheduledDeliveryTime());
+   }
+
+   @Test
+   public void testHasScheduledDeliveryDelayReloadPersistence() {
+      final long scheduledDelay = 100000;
+      MessageImpl protonMessage = createProtonMessage();
+      MessageAnnotations annotations = protonMessage.getMessageAnnotations();
+      annotations.getValue().put(AMQPMessageSupport.SCHEDULED_DELIVERY_DELAY, scheduledDelay);
+      ActiveMQBuffer encoded = encodeMessageAsPersistedBuffer(protonMessage);
+
+      AMQPMessage message = new AMQPMessage(0);
+      try {
+         message.getProtonMessage();
+         fail("Should throw NPE due to not being initialized yet");
+      } catch (NullPointerException npe) {
+      }
+
+      // Now reload from encoded data
+      message.reloadPersistence(encoded, null);
+
+      assertTrue(message.hasScheduledDeliveryTime());
+      message.getHeader();
+      assertTrue(message.hasScheduledDeliveryTime());
+   }
+
+   @Test
+   public void testNoScheduledDeliveryTimeOrDelayReloadPersistence() {
+      MessageImpl protonMessage = createProtonMessage();
+      ActiveMQBuffer encoded = encodeMessageAsPersistedBuffer(protonMessage);
+
+      AMQPMessage message = new AMQPMessage(0);
+      try {
+         message.getProtonMessage();
+         fail("Should throw NPE due to not being initialized yet");
+      } catch (NullPointerException npe) {
+      }
+
+      // Now reload from encoded data
+      message.reloadPersistence(encoded, null);
+
+      assertFalse(message.hasScheduledDeliveryTime());
+      message.getHeader();
+      assertFalse(message.hasScheduledDeliveryTime());
+   }
+
    //----- Test Memory Estimate access ---------------------------------------//
 
    @Test
@@ -2010,10 +2077,10 @@ public class AMQPMessageTest {
       properties.setTo(TEST_TO_ADDRESS);
       properties.setMessageId(UUID.randomUUID());
 
-      MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
+      MessageAnnotations annotations = new MessageAnnotations(new LinkedHashMap<>());
       annotations.getValue().put(Symbol.valueOf(TEST_MESSAGE_ANNOTATION_KEY), TEST_MESSAGE_ANNOTATION_VALUE);
 
-      ApplicationProperties applicationProperties = new ApplicationProperties(new HashMap<>());
+      ApplicationProperties applicationProperties = new ApplicationProperties(new LinkedHashMap<>());
       applicationProperties.getValue().put(TEST_APPLICATION_PROPERTY_KEY, TEST_APPLICATION_PROPERTY_VALUE);
 
       AmqpValue body = new AmqpValue(TEST_STRING_BODY);


[activemq-artemis] 01/03: ARTEMIS-2617 use core pools to reduce GC on journal loading

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 5897909dc903cf344e218b976f5593395e43336c
Author: Francesco Nigro <ni...@gmail.com>
AuthorDate: Fri Feb 7 13:47:02 2020 +0100

    ARTEMIS-2617 use core pools to reduce GC on journal loading
---
 .../artemis/cli/commands/etc/artemis.profile       |  2 +-
 .../artemis/cli/commands/etc/artemis.profile.cmd   |  2 +-
 .../activemq/artemis/api/core/SimpleString.java    |  8 +++--
 .../artemis/core/persistence/Persister.java        | 18 ++++++-----
 .../org/apache/activemq/artemis/utils/UUID.java    |  2 ++
 .../artemis/utils/collections/TypedProperties.java | 36 ++++++++++++++--------
 .../apache/activemq/artemis/api/core/Message.java  |  4 +--
 .../artemis/core/message/impl/CoreMessage.java     | 20 +++++++-----
 .../core/message/impl/CoreMessageObjectPools.java  | 26 +++++++++++++---
 .../core/message/impl/CoreMessagePersister.java    | 16 ++++++----
 .../core/message/impl/MessageInternalImpl.java     |  4 +--
 .../artemis/core/journal/EncoderPersister.java     |  2 +-
 .../artemis/protocol/amqp/broker/AMQPMessage.java  |  4 +--
 .../protocol/amqp/broker/AMQPMessagePersister.java | 14 ++++++---
 .../amqp/broker/AMQPMessagePersisterV2.java        | 21 +++++++++----
 .../amqp/broker/ProtonProtocolManagerFactory.java  |  3 +-
 .../protocol/amqp/broker/AMQPMessageTest.java      |  2 +-
 .../core/protocol/openwire/OpenwireMessage.java    |  4 +--
 .../journal/AbstractJournalStorageManager.java     | 25 +++++++++++++--
 .../impl/journal/codec/LargeMessagePersister.java  |  2 +-
 .../core/impl/CoreProtocolManagerFactory.java      |  3 +-
 .../spi/core/protocol/MessagePersister.java        | 15 ++++-----
 .../spi/core/protocol/ProtocolManagerFactory.java  |  3 +-
 .../server/impl/ScheduledDeliveryHandlerTest.java  |  4 +--
 .../tests/integration/client/AcknowledgeTest.java  |  4 +--
 .../replication/SharedNothingReplicationTest.java  |  7 +++--
 26 files changed, 168 insertions(+), 83 deletions(-)

diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile
index b50cf28..2ce58f2 100644
--- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile
+++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile
@@ -33,7 +33,7 @@ ARTEMIS_INSTANCE_ETC_URI='${artemis.instance.etc.uri}'
 
 # Java Opts
 if [ -z "$JAVA_ARGS" ]; then
-    JAVA_ARGS="${java-opts} -XX:+PrintClassHistogram -XX:+UseG1GC -Xms512M -Xmx2G -Dhawtio.realm=activemq  -Dhawtio.offline=true -Dhawtio.role=${role} -Dhawtio.rolePrincipalClasses=org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal -Djolokia.policyLocation=${ARTEMIS_INSTANCE_ETC_URI}jolokia-access.xml"
+    JAVA_ARGS="${java-opts} -XX:+PrintClassHistogram -XX:+UseG1GC -XX:+UseStringDeduplication -Xms512M -Xmx2G -Dhawtio.realm=activemq  -Dhawtio.offline=true -Dhawtio.role=${role} -Dhawtio.rolePrincipalClasses=org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal -Djolokia.policyLocation=${ARTEMIS_INSTANCE_ETC_URI}jolokia-access.xml"
 fi
 
 #
diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile.cmd b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile.cmd
index 6778eba..f653b27 100644
--- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile.cmd
+++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile.cmd
@@ -33,7 +33,7 @@ rem Cluster Properties: Used to pass arguments to ActiveMQ Artemis which can be
 rem set ARTEMIS_CLUSTER_PROPS=-Dactivemq.remoting.default.port=61617 -Dactivemq.remoting.amqp.port=5673 -Dactivemq.remoting.stomp.port=61614 -Dactivemq.remoting.hornetq.port=5446
 
 rem Java Opts
-IF "%JAVA_ARGS%"=="" (set JAVA_ARGS=${java-opts} -XX:+PrintClassHistogram -XX:+UseG1GC -Xms512M -Xmx1024M -Xbootclasspath/a:%ARTEMIS_HOME%\lib\${logmanager};%ARTEMIS_HOME%\lib\${wildfly-common} -Djava.security.auth.login.config=%ARTEMIS_ETC_DIR%\login.config -Dhawtio.offline=true -Dhawtio.realm=activemq -Dhawtio.role=${role} -Dhawtio.rolePrincipalClasses=org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal -Djolokia.policyLocation=%ARTEMIS_INSTANCE_ETC_URI%\jolokia-access.xml [...]
+IF "%JAVA_ARGS%"=="" (set JAVA_ARGS=${java-opts} -XX:+PrintClassHistogram  -XX:+UseG1GC -XX:+UseStringDeduplication -Xms512M -Xmx1024M -Xbootclasspath/a:%ARTEMIS_HOME%\lib\${logmanager};%ARTEMIS_HOME%\lib\${wildfly-common} -Djava.security.auth.login.config=%ARTEMIS_ETC_DIR%\login.config -Dhawtio.offline=true -Dhawtio.realm=activemq -Dhawtio.role=${role} -Dhawtio.rolePrincipalClasses=org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal -Djolokia.policyLocation=%ARTEMIS_INSTANC [...]
 
 rem Logs Safepoints JVM pauses: Uncomment to enable them
 rem In addition to the traditional GC logs you could enable some JVM flags to know any meaningful and "hidden" pause that could
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
index 7767fdd..91d884b 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
@@ -560,12 +560,16 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
 
    public static final class ByteBufSimpleStringPool extends AbstractByteBufPool<SimpleString> {
 
-      private static final int UUID_LENGTH = 36;
+      public static final int DEFAULT_MAX_LENGTH = 36;
 
       private final int maxLength;
 
       public ByteBufSimpleStringPool() {
-         this.maxLength = UUID_LENGTH;
+         this.maxLength = DEFAULT_MAX_LENGTH;
+      }
+
+      public ByteBufSimpleStringPool(final int capacity) {
+         this(capacity, DEFAULT_MAX_LENGTH);
       }
 
       public ByteBufSimpleStringPool(final int capacity, final int maxCharsLength) {
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/Persister.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/Persister.java
index 124dfcf..bf8df3f 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/Persister.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/Persister.java
@@ -19,20 +19,22 @@ package org.apache.activemq.artemis.core.persistence;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 
-public interface Persister<T extends Object> {
-
-   /** This is to be used to store the protocol-id on Messages.
-    *  Messages are stored on their bare format.
-    *  The protocol manager will be responsible to code or decode messages.
-    *  The caveat here is that the first short-sized bytes need to be this constant. */
+public interface Persister<T extends Object, A> {
+
+   /**
+    * This is to be used to store the protocol-id on Messages.
+    * Messages are stored on their bare format.
+    * The protocol manager will be responsible to code or decode messages.
+    * The caveat here is that the first short-sized bytes need to be this constant.
+    */
    default byte getID() {
-      return (byte)0;
+      return (byte) 0;
    }
 
    int getEncodeSize(T record);
 
    void encode(ActiveMQBuffer buffer, T record);
 
-   T decode(ActiveMQBuffer buffer, T record);
+   T decode(ActiveMQBuffer buffer, A arg);
 
 }
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUID.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUID.java
index 7d8e984..64ff432 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUID.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUID.java
@@ -98,6 +98,7 @@ public final class UUID {
     * @param data 16 byte UUID contents
     */
    public UUID(final int type, final byte[] data) {
+      assert data.length == 16;
       mId = data;
       // Type is multiplexed with time_hi:
       mId[UUID.INDEX_TYPE] &= (byte) 0x0F;
@@ -108,6 +109,7 @@ public final class UUID {
    }
 
    private UUID(final byte[] data) {
+      assert data.length == 16;
       mId = data;
    }
 
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
index 223da17..df81c28 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
@@ -446,17 +446,21 @@ public class TypedProperties {
    }
 
    public synchronized void decode(final ByteBuf buffer,
-                                   final TypedPropertiesDecoderPools keyValuePools) {
+                                   final TypedPropertiesDecoderPools keyValuePools,
+                                   boolean replaceExisting) {
       byte b = buffer.readByte();
       if (b == DataConstants.NULL) {
-         properties = null;
-         size = 0;
+         if (replaceExisting) {
+            properties = null;
+            size = 0;
+         }
       } else {
          int numHeaders = buffer.readInt();
-
-         //optimize the case of no collisions to avoid any resize (it doubles the map size!!!) when load factor is reached
-         properties = new HashMap<>(numHeaders, 1.0f);
-         size = 0;
+         if (replaceExisting || properties == null) {
+            //optimize the case of no collisions to avoid any resize (it doubles the map size!!!) when load factor is reached
+            properties = new HashMap<>(numHeaders, 1.0f);
+         }
+         size = properties.size();
 
          for (int i = 0; i < numHeaders; i++) {
             final SimpleString key = SimpleString.readSimpleString(buffer, keyValuePools == null ? null : keyValuePools.getPropertyKeysPool());
@@ -529,6 +533,10 @@ public class TypedProperties {
       }
    }
 
+   public synchronized void decode(final ByteBuf buffer, final TypedPropertiesDecoderPools keyValuePools) {
+      decode(buffer, keyValuePools, true);
+   }
+
    public void decode(final ByteBuf buffer) {
       decode(buffer, null);
    }
@@ -1029,12 +1037,16 @@ public class TypedProperties {
 
       public static final class ByteBufStringValuePool extends AbstractByteBufPool<StringValue> {
 
-         private static final int UUID_LENGTH = 36;
+         public static final int DEFAULT_MAX_LENGTH = 36;
 
          private final int maxLength;
 
          public ByteBufStringValuePool() {
-            this.maxLength = UUID_LENGTH;
+            this.maxLength = DEFAULT_MAX_LENGTH;
+         }
+
+         public ByteBufStringValuePool(final int capacity) {
+            this(capacity, DEFAULT_MAX_LENGTH);
          }
 
          public ByteBufStringValuePool(final int capacity, final int maxCharsLength) {
@@ -1074,9 +1086,9 @@ public class TypedProperties {
          this.propertyValuesPool = new TypedProperties.StringValue.ByteBufStringValuePool();
       }
 
-      public TypedPropertiesDecoderPools(int keyPoolCapacity, int valuePoolCapacity, int maxCharsLength) {
-         this.propertyKeysPool = new SimpleString.ByteBufSimpleStringPool(keyPoolCapacity, maxCharsLength);
-         this.propertyValuesPool = new TypedProperties.StringValue.ByteBufStringValuePool(valuePoolCapacity, maxCharsLength);
+      public TypedPropertiesDecoderPools(int keyPoolCapacity, int valuePoolCapacity) {
+         this.propertyKeysPool = new SimpleString.ByteBufSimpleStringPool(keyPoolCapacity);
+         this.propertyValuesPool = new TypedProperties.StringValue.ByteBufStringValuePool(valuePoolCapacity);
       }
 
       public SimpleString.ByteBufSimpleStringPool getPropertyKeysPool() {
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index 1de8150..09c2d39 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -394,7 +394,7 @@ public interface Message {
     */
    Message setDurable(boolean durable);
 
-   Persister<Message> getPersister();
+   Persister<Message, CoreMessageObjectPools> getPersister();
 
    String getAddress();
 
@@ -454,7 +454,7 @@ public interface Message {
 
    void persist(ActiveMQBuffer targetRecord);
 
-   void reloadPersistence(ActiveMQBuffer record);
+   void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools);
 
    default void releaseBuffer() {
       ByteBuf buffer = getBuffer();
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index 9050550..b61b27e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -130,7 +130,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
    }
 
    @Override
-   public Persister<Message> getPersister() {
+   public Persister<Message, CoreMessageObjectPools> getPersister() {
       return CoreMessagePersister.getInstance();
    }
 
@@ -646,11 +646,15 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
    }
 
    private void decode(boolean beforeAddress) {
+      decode(beforeAddress, coreMessageObjectPools);
+   }
+
+   private void decode(boolean beforeAddress, CoreMessageObjectPools pools) {
       endOfBodyPosition = buffer.readInt();
 
       buffer.skipBytes(endOfBodyPosition - BUFFER_HEADER_SPACE);
 
-      decodeHeadersAndProperties(buffer, true);
+      decodeHeadersAndProperties(buffer, true, pools);
       buffer.readerIndex(0);
       validBuffer = true;
 
@@ -662,14 +666,14 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
    }
 
    public void decodeHeadersAndProperties(final ByteBuf buffer) {
-      decodeHeadersAndProperties(buffer, false);
+      decodeHeadersAndProperties(buffer, false, coreMessageObjectPools);
    }
 
-   private void decodeHeadersAndProperties(final ByteBuf buffer, boolean lazyProperties) {
+   private void decodeHeadersAndProperties(final ByteBuf buffer, boolean lazyProperties, CoreMessageObjectPools pools) {
       messageIDPosition = buffer.readerIndex();
       messageID = buffer.readLong();
 
-      address = SimpleString.readNullableSimpleString(buffer, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressDecoderPool());
+      address = SimpleString.readNullableSimpleString(buffer, pools == null ? null : pools.getAddressDecoderPool());
       if (buffer.readByte() == DataConstants.NOT_NULL) {
          byte[] bytes = new byte[16];
          buffer.readBytes(bytes);
@@ -687,7 +691,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
          propertiesLocation = buffer.readerIndex();
       } else {
          properties = new TypedProperties(INTERNAL_PROPERTY_NAMES_PREDICATE);
-         properties.decode(buffer, coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesDecoderPools());
+         properties.decode(buffer, pools == null ? null : pools.getPropertiesDecoderPools());
       }
    }
 
@@ -1180,11 +1184,11 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
    }
 
    @Override
-   public void reloadPersistence(ActiveMQBuffer record) {
+   public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools) {
       int size = record.readInt();
       initBuffer(size);
       buffer.setIndex(0, 0).writeBytes(record.byteBuf(), size);
-      decode(false);
+      decode(false, pools);
    }
 
    @Override
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java
index 4c56eac..62ee5ed 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java
@@ -25,14 +25,30 @@ import java.util.function.Supplier;
 
 public class CoreMessageObjectPools {
 
-   private Supplier<SimpleString.ByteBufSimpleStringPool> addressDecoderPool = Suppliers.memoize(SimpleString.ByteBufSimpleStringPool::new);
-   private Supplier<TypedProperties.TypedPropertiesDecoderPools> propertiesDecoderPools = Suppliers.memoize(TypedProperties.TypedPropertiesDecoderPools::new);
+   private final Supplier<SimpleString.ByteBufSimpleStringPool> addressDecoderPool;
+   private final Supplier<TypedProperties.TypedPropertiesDecoderPools> propertiesDecoderPools;
 
-   private Supplier<SimpleString.StringSimpleStringPool> groupIdStringSimpleStringPool = Suppliers.memoize(SimpleString.StringSimpleStringPool::new);
-   private Supplier<SimpleString.StringSimpleStringPool> addressStringSimpleStringPool = Suppliers.memoize(SimpleString.StringSimpleStringPool::new);
-   private Supplier<TypedProperties.TypedPropertiesStringSimpleStringPools> propertiesStringSimpleStringPools = Suppliers.memoize(TypedProperties.TypedPropertiesStringSimpleStringPools::new);
+   private final Supplier<SimpleString.StringSimpleStringPool> groupIdStringSimpleStringPool;
+   private final Supplier<SimpleString.StringSimpleStringPool> addressStringSimpleStringPool;
+   private final Supplier<TypedProperties.TypedPropertiesStringSimpleStringPools> propertiesStringSimpleStringPools;
+
+   public CoreMessageObjectPools(int addressPoolCapacity,
+                                 int groupIdCapacity,
+                                 int propertyKeyCapacity,
+                                 int propertyValueCapacity) {
+      addressDecoderPool = Suppliers.memoize(() -> new SimpleString.ByteBufSimpleStringPool(addressPoolCapacity));
+      propertiesDecoderPools = Suppliers.memoize(() -> new TypedProperties.TypedPropertiesDecoderPools(propertyKeyCapacity, propertyValueCapacity));
+      groupIdStringSimpleStringPool = Suppliers.memoize(() -> new SimpleString.StringSimpleStringPool(groupIdCapacity));
+      addressStringSimpleStringPool = Suppliers.memoize(() -> new SimpleString.StringSimpleStringPool(addressPoolCapacity));
+      propertiesStringSimpleStringPools = Suppliers.memoize(() -> new TypedProperties.TypedPropertiesStringSimpleStringPools(propertyKeyCapacity, propertyValueCapacity));
+   }
 
    public CoreMessageObjectPools() {
+      addressDecoderPool = Suppliers.memoize(SimpleString.ByteBufSimpleStringPool::new);
+      propertiesDecoderPools = Suppliers.memoize(TypedProperties.TypedPropertiesDecoderPools::new);
+      groupIdStringSimpleStringPool = Suppliers.memoize(SimpleString.StringSimpleStringPool::new);
+      addressStringSimpleStringPool = Suppliers.memoize(SimpleString.StringSimpleStringPool::new);
+      propertiesStringSimpleStringPools = Suppliers.memoize(TypedProperties.TypedPropertiesStringSimpleStringPools::new);
    }
 
    public SimpleString.ByteBufSimpleStringPool getAddressDecoderPool() {
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessagePersister.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessagePersister.java
index cbd565d..3861d67 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessagePersister.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessagePersister.java
@@ -23,7 +23,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.utils.DataConstants;
 
-public class CoreMessagePersister implements Persister<Message> {
+public class CoreMessagePersister implements Persister<Message, CoreMessageObjectPools> {
    public static final byte ID = 1;
 
    private static CoreMessagePersister theInstance;
@@ -68,14 +68,18 @@ public class CoreMessagePersister implements Persister<Message> {
       record.persist(buffer);
    }
 
-
    @Override
-   public Message decode(ActiveMQBuffer buffer, Message record) {
+   public Message decode(ActiveMQBuffer buffer, CoreMessageObjectPools pool) {
       // the caller must consume the first byte already, as that will be used to decide what persister (protocol) to use
       long id = buffer.readLong();
-      SimpleString address = buffer.readNullableSimpleString();
-      record = new CoreMessage();
-      record.reloadPersistence(buffer);
+      final SimpleString address;
+      if (pool == null) {
+         address = buffer.readNullableSimpleString();
+      } else {
+         address = SimpleString.readNullableSimpleString(buffer.byteBuf(), pool.getAddressDecoderPool());
+      }
+      CoreMessage record = new CoreMessage();
+      record.reloadPersistence(buffer, pool);
       record.setMessageID(id);
       record.setAddress(address);
       return record;
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java
index e1dcf97..0f809ad 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java
@@ -248,7 +248,7 @@ public class MessageInternalImpl implements MessageInternal {
    }
 
    @Override
-   public Persister<Message> getPersister() {
+   public Persister<Message, CoreMessageObjectPools> getPersister() {
       throw new UnsupportedOperationException();
    }
 
@@ -340,7 +340,7 @@ public class MessageInternalImpl implements MessageInternal {
    }
 
    @Override
-   public void reloadPersistence(ActiveMQBuffer record) {
+   public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools) {
       throw new UnsupportedOperationException();
    }
 
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncoderPersister.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncoderPersister.java
index 8fc2a5aa..1e734d3 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncoderPersister.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncoderPersister.java
@@ -22,7 +22,7 @@ import org.apache.activemq.artemis.core.persistence.Persister;
 
 /** This is a facade between the new Persister and the former EncodingSupport.
  *  Methods using the old interface will use this as a facade to provide the previous semantic. */
-public class EncoderPersister implements Persister<EncodingSupport> {
+public class EncoderPersister implements Persister<EncodingSupport, EncodingSupport> {
 
    private static final EncoderPersister theInstance = new EncoderPersister();
 
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index fcfe10e..be758de 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -741,7 +741,7 @@ public class AMQPMessage extends RefCountMessage {
    }
 
    @Override
-   public void reloadPersistence(ActiveMQBuffer record) {
+   public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools) {
       int size = record.readInt();
       byte[] recordArray = new byte[size];
       record.readBytes(recordArray);
@@ -771,7 +771,7 @@ public class AMQPMessage extends RefCountMessage {
    }
 
    @Override
-   public Persister<org.apache.activemq.artemis.api.core.Message> getPersister() {
+   public Persister<org.apache.activemq.artemis.api.core.Message, CoreMessageObjectPools> getPersister() {
       return AMQPMessagePersisterV2.getInstance();
    }
 
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java
index c688124..9ab0842 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java
@@ -20,6 +20,7 @@ package org.apache.activemq.artemis.protocol.amqp.broker;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
 import org.apache.activemq.artemis.utils.DataConstants;
 
@@ -62,12 +63,17 @@ public class AMQPMessagePersister extends MessagePersister {
    }
 
    @Override
-   public Message decode(ActiveMQBuffer buffer, Message record) {
+   public Message decode(ActiveMQBuffer buffer, CoreMessageObjectPools pool) {
       long id = buffer.readLong();
       long format = buffer.readLong();
-      SimpleString address = buffer.readNullableSimpleString();
-      record = new AMQPMessage(format);
-      record.reloadPersistence(buffer);
+      final SimpleString address;
+      if (pool == null) {
+         address = buffer.readNullableSimpleString();
+      } else {
+         address = SimpleString.readNullableSimpleString(buffer.byteBuf(), pool.getAddressDecoderPool());
+      }
+      AMQPMessage record = new AMQPMessage(format);
+      record.reloadPersistence(buffer, pool);
       record.setMessageID(id);
       if (address != null) {
          record.setAddress(address);
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersisterV2.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersisterV2.java
index c268694..d263fe9 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersisterV2.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersisterV2.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.protocol.amqp.broker;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.utils.DataConstants;
 import org.apache.activemq.artemis.utils.collections.TypedProperties;
 
@@ -68,16 +69,24 @@ public class AMQPMessagePersisterV2 extends AMQPMessagePersister {
       }
    }
 
-
    @Override
-   public Message decode(ActiveMQBuffer buffer, Message record) {
-      AMQPMessage message = (AMQPMessage)super.decode(buffer, record);
+   public Message decode(ActiveMQBuffer buffer, CoreMessageObjectPools pool) {
+      AMQPMessage message = (AMQPMessage) super.decode(buffer, pool);
       int size = buffer.readInt();
 
       if (size != 0) {
-         TypedProperties properties = new TypedProperties(Message.INTERNAL_PROPERTY_NAMES_PREDICATE);
-         properties.decode(buffer.byteBuf());
-         message.setExtraProperties(properties);
+         // message::setAddress could have populated extra properties
+         // hence, we can safely replace the value on the properties
+         // if it has been encoded differently in the rest of the buffer
+         TypedProperties existingExtraProperties = message.getExtraProperties();
+         TypedProperties extraProperties = existingExtraProperties;
+         if (existingExtraProperties == null) {
+            extraProperties = new TypedProperties(Message.INTERNAL_PROPERTY_NAMES_PREDICATE);
+         }
+         extraProperties.decode(buffer.byteBuf(), pool != null ? pool.getPropertiesDecoderPools() : null, existingExtraProperties == null);
+         if (extraProperties != existingExtraProperties) {
+            message.setExtraProperties(extraProperties);
+         }
       }
       return message;
    }
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java
index cd21e46..7470d60 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java
@@ -21,6 +21,7 @@ import java.util.Map;
 
 import org.apache.activemq.artemis.api.core.BaseInterceptor;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory;
@@ -39,7 +40,7 @@ public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory
    private static String[] SUPPORTED_PROTOCOLS = {AMQP_PROTOCOL_NAME};
 
    @Override
-   public Persister<Message>[] getPersister() {
+   public Persister<Message, CoreMessageObjectPools>[] getPersister() {
 
       Persister[] persisters = new Persister[]{AMQPMessagePersister.getInstance(), AMQPMessagePersisterV2.getInstance()};
       return persisters;
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
index 88414b4..695bf0f 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
@@ -147,7 +147,7 @@ public class AMQPMessageTest {
       final long persistedSize = (long) encoded.readableBytes();
 
       // Now reload from encoded data
-      message.reloadPersistence(encoded);
+      message.reloadPersistence(encoded, null);
 
       assertEquals(persistedSize, message.getPersistSize());
       assertEquals(persistedSize - Integer.BYTES, message.getPersistentSize());
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
index 45e8953..9644b70 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
@@ -140,7 +140,7 @@ public class OpenwireMessage implements Message {
    }
 
    @Override
-   public Persister<Message> getPersister() {
+   public Persister<Message, CoreMessageObjectPools> getPersister() {
       return null;
    }
 
@@ -205,7 +205,7 @@ public class OpenwireMessage implements Message {
    }
 
    @Override
-   public void reloadPersistence(ActiveMQBuffer record) {
+   public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools) {
 
    }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index effb7e1..296b221 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.artemis.core.persistence.impl.journal;
 
+import static org.apache.activemq.artemis.api.core.SimpleString.ByteBufSimpleStringPool.DEFAULT_MAX_LENGTH;
+import static org.apache.activemq.artemis.api.core.SimpleString.ByteBufSimpleStringPool.DEFAULT_POOL_CAPACITY;
 import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_CURSOR;
 import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE_PENDING;
 import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.DUPLICATE_ID;
@@ -56,6 +58,7 @@ import org.apache.activemq.artemis.core.journal.Journal;
 import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
 import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
 import org.apache.activemq.artemis.core.paging.PagingManager;
 import org.apache.activemq.artemis.core.paging.PagingStore;
@@ -858,6 +861,19 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
          }
 
          final MutableLong recordNumber = new MutableLong();
+         final CoreMessageObjectPools pools;
+         if (totalSize > 0) {
+            final int addresses = (int)Math.max(
+               DEFAULT_POOL_CAPACITY,
+               queueInfos == null ? 0 :
+                  queueInfos.values().stream()
+                     .map(QueueBindingInfo::getAddress)
+                     .filter(addr -> addr.length() <= DEFAULT_MAX_LENGTH)
+                     .count() * 2);
+            pools = new CoreMessageObjectPools(addresses, DEFAULT_POOL_CAPACITY, 128, 128);
+         } else {
+            pools = null;
+         }
          // This will free up memory sooner while reading the records
          records.clear(record -> {
             try {
@@ -904,7 +920,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
 
                   case JournalRecordIds.ADD_MESSAGE_PROTOCOL: {
 
-                     Message message = MessagePersister.getInstance().decode(buff, null);
+                     Message message = MessagePersister.getInstance().decode(buff, pools);
 
                      messages.put(record.id, message);
 
@@ -1716,6 +1732,8 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
                                          final Set<Pair<Long, Long>> pendingLargeMessages,
                                          JournalLoader journalLoader) throws Exception {
       // recover prepared transactions
+      CoreMessageObjectPools pools = null;
+
       for (PreparedTransactionInfo preparedTransaction : preparedTransactions) {
          XidEncoding encodingXid = new XidEncoding(preparedTransaction.getExtraData());
 
@@ -1749,7 +1767,10 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
                   break;
                }
                case JournalRecordIds.ADD_MESSAGE_PROTOCOL: {
-                  Message message = MessagePersister.getInstance().decode(buff, null);
+                  if (pools == null) {
+                     pools = new CoreMessageObjectPools();
+                  }
+                  Message message = MessagePersister.getInstance().decode(buff, pools);
 
                   messages.put(record.id, message);
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/LargeMessagePersister.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/LargeMessagePersister.java
index b715f97..4e02f68 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/LargeMessagePersister.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/LargeMessagePersister.java
@@ -22,7 +22,7 @@ import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
 
-public class LargeMessagePersister implements Persister<LargeServerMessage> {
+public class LargeMessagePersister implements Persister<LargeServerMessage, LargeServerMessage> {
 
    private static final LargeMessagePersister theInstance = new LargeMessagePersister();
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java
index 7590924..6d5e352 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java
@@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.core.BaseInterceptor;
 import org.apache.activemq.artemis.api.core.Interceptor;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister;
 import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -37,7 +38,7 @@ public class CoreProtocolManagerFactory extends AbstractProtocolManagerFactory<I
    private static final String MODULE_NAME = "artemis-server";
 
    @Override
-   public Persister<Message>[] getPersister() {
+   public Persister<Message, CoreMessageObjectPools>[] getPersister() {
       return new Persister[]{CoreMessagePersister.getInstance()};
    }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java
index ad1317f..2fddc56 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java
@@ -21,11 +21,12 @@ import java.util.ServiceLoader;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister;
 import org.apache.activemq.artemis.core.persistence.Persister;
 import org.jboss.logging.Logger;
 
-public class MessagePersister implements Persister<Message> {
+public class MessagePersister implements Persister<Message, CoreMessageObjectPools> {
 
    private static final Logger logger = Logger.getLogger(MessagePersister.class);
 
@@ -33,7 +34,7 @@ public class MessagePersister implements Persister<Message> {
 
    /** This will be used for reading messages */
    private static final int MAX_PERSISTERS = 3;
-   private static final Persister<Message>[] persisters = new Persister[MAX_PERSISTERS];
+   private static final Persister<Message, CoreMessageObjectPools>[] persisters = new Persister[MAX_PERSISTERS];
 
    static {
       CoreMessagePersister persister = CoreMessagePersister.getInstance();
@@ -46,7 +47,7 @@ public class MessagePersister implements Persister<Message> {
    }
 
    public static void registerProtocol(ProtocolManagerFactory manager) {
-      Persister<Message>[] messagePersisters = manager.getPersister();
+      Persister<Message, CoreMessageObjectPools>[] messagePersisters = manager.getPersister();
       if (messagePersisters == null || messagePersisters.length == 0) {
          logger.debug("Cannot find persister for " + manager);
       } else {
@@ -69,7 +70,7 @@ public class MessagePersister implements Persister<Message> {
       return persisters[id - 1];
    }
 
-   public static void registerPersister(Persister<Message> persister) {
+   public static void registerPersister(Persister<Message, CoreMessageObjectPools> persister) {
       if (persister != null) {
          assert persister.getID() <= MAX_PERSISTERS : "You must update MessagePersister::MAX_PERSISTERS to a higher number";
          persisters[persister.getID() - 1] = persister;
@@ -97,12 +98,12 @@ public class MessagePersister implements Persister<Message> {
    }
 
    @Override
-   public Message decode(ActiveMQBuffer buffer, Message record) {
+   public Message decode(ActiveMQBuffer buffer, CoreMessageObjectPools pools) {
       byte protocol = buffer.readByte();
-      Persister<Message> persister = getPersister(protocol);
+      Persister<Message, CoreMessageObjectPools> persister = getPersister(protocol);
       if (persister == null) {
          throw new NullPointerException("couldn't find factory for type=" + protocol);
       }
-      return persister.decode(buffer, record);
+      return persister.decode(buffer, pools);
    }
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java
index 4ab34eb..77c66f9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java
@@ -21,12 +21,13 @@ import java.util.Map;
 
 import org.apache.activemq.artemis.api.core.BaseInterceptor;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 
 public interface ProtocolManagerFactory<P extends BaseInterceptor> {
 
-   default Persister<Message>[] getPersister() {
+   default Persister<Message, CoreMessageObjectPools>[] getPersister() {
       return new Persister[]{};
    }
 
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index b3ae240..d0ed6f8 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -323,12 +323,12 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
-      public void reloadPersistence(ActiveMQBuffer record) {
+      public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools) {
 
       }
 
       @Override
-      public Persister<Message> getPersister() {
+      public Persister<Message, CoreMessageObjectPools> getPersister() {
          return null;
       }
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
index 511d476..dc23bc9 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
@@ -371,12 +371,12 @@ public class AcknowledgeTest extends ActiveMQTestBase {
       }
 
       @Override
-      public Persister<Message> getPersister() {
+      public Persister<Message, CoreMessageObjectPools> getPersister() {
          return null;
       }
 
       @Override
-      public void reloadPersistence(ActiveMQBuffer record) {
+      public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools) {
 
       }
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationTest.java
index 7cd4b05..f05552b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationTest.java
@@ -39,6 +39,7 @@ import org.apache.activemq.artemis.core.journal.LoaderCallback;
 import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
 import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister;
 import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
@@ -294,7 +295,7 @@ public class SharedNothingReplicationTest extends ActiveMQTestBase {
       return conf;
    }
 
-   static class SlowMessagePersister extends CoreMessagePersister implements Persister<Message> {
+   static class SlowMessagePersister extends CoreMessagePersister implements Persister<Message, CoreMessageObjectPools> {
 
       boolean used = false;
 
@@ -343,8 +344,8 @@ public class SharedNothingReplicationTest extends ActiveMQTestBase {
       }
 
       @Override
-      public Message decode(ActiveMQBuffer buffer, Message record) {
-         return persister.decode(buffer, record);
+      public Message decode(ActiveMQBuffer buffer, CoreMessageObjectPools pool) {
+         return persister.decode(buffer, pool);
       }
    }
 


[activemq-artemis] 03/03: This closes #2975

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 69779eed10f47595a55cab317452acc2243e135e
Merge: 0b8c33b d42267f
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Wed Feb 12 13:29:51 2020 -0500

    This closes #2975

 .../artemis/cli/commands/etc/artemis.profile       |   2 +-
 .../artemis/cli/commands/etc/artemis.profile.cmd   |   2 +-
 .../activemq/artemis/api/core/SimpleString.java    |   8 +-
 .../artemis/core/persistence/Persister.java        |  18 +--
 .../org/apache/activemq/artemis/utils/UUID.java    |   2 +
 .../activemq/artemis/utils/algo/KMPNeedle.java     | 103 ++++++++++++++++
 .../artemis/utils/collections/TypedProperties.java |  36 ++++--
 .../apache/activemq/artemis/api/core/Message.java  |   4 +-
 .../artemis/core/message/impl/CoreMessage.java     |  20 ++--
 .../core/message/impl/CoreMessageObjectPools.java  |  26 ++++-
 .../core/message/impl/CoreMessagePersister.java    |  16 ++-
 .../core/message/impl/MessageInternalImpl.java     |   4 +-
 .../artemis/core/journal/EncoderPersister.java     |   2 +-
 .../artemis/protocol/amqp/broker/AMQPMessage.java  | 129 ++++++++++++++++++---
 .../protocol/amqp/broker/AMQPMessagePersister.java |  14 ++-
 .../amqp/broker/AMQPMessagePersisterV2.java        |  21 +++-
 .../amqp/broker/AMQPMessageSymbolSearch.java       |  87 ++++++++++++++
 .../amqp/broker/ProtonProtocolManagerFactory.java  |   3 +-
 .../protocol/amqp/broker/AMQPMessageTest.java      |  73 +++++++++++-
 .../core/protocol/openwire/OpenwireMessage.java    |   4 +-
 .../journal/AbstractJournalStorageManager.java     |  25 +++-
 .../impl/journal/codec/LargeMessagePersister.java  |   2 +-
 .../core/impl/CoreProtocolManagerFactory.java      |   3 +-
 .../spi/core/protocol/MessagePersister.java        |  15 +--
 .../spi/core/protocol/ProtocolManagerFactory.java  |   3 +-
 .../server/impl/ScheduledDeliveryHandlerTest.java  |   4 +-
 .../tests/integration/client/AcknowledgeTest.java  |   4 +-
 .../replication/SharedNothingReplicationTest.java  |   7 +-
 28 files changed, 537 insertions(+), 100 deletions(-)