You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2020/02/12 18:36:34 UTC
[activemq-artemis] 01/03: ARTEMIS-2617 use core pools to reduce GC
on journal loading
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit 5897909dc903cf344e218b976f5593395e43336c
Author: Francesco Nigro <ni...@gmail.com>
AuthorDate: Fri Feb 7 13:47:02 2020 +0100
ARTEMIS-2617 use core pools to reduce GC on journal loading
---
.../artemis/cli/commands/etc/artemis.profile | 2 +-
.../artemis/cli/commands/etc/artemis.profile.cmd | 2 +-
.../activemq/artemis/api/core/SimpleString.java | 8 +++--
.../artemis/core/persistence/Persister.java | 18 ++++++-----
.../org/apache/activemq/artemis/utils/UUID.java | 2 ++
.../artemis/utils/collections/TypedProperties.java | 36 ++++++++++++++--------
.../apache/activemq/artemis/api/core/Message.java | 4 +--
.../artemis/core/message/impl/CoreMessage.java | 20 +++++++-----
.../core/message/impl/CoreMessageObjectPools.java | 26 +++++++++++++---
.../core/message/impl/CoreMessagePersister.java | 16 ++++++----
.../core/message/impl/MessageInternalImpl.java | 4 +--
.../artemis/core/journal/EncoderPersister.java | 2 +-
.../artemis/protocol/amqp/broker/AMQPMessage.java | 4 +--
.../protocol/amqp/broker/AMQPMessagePersister.java | 14 ++++++---
.../amqp/broker/AMQPMessagePersisterV2.java | 21 +++++++++----
.../amqp/broker/ProtonProtocolManagerFactory.java | 3 +-
.../protocol/amqp/broker/AMQPMessageTest.java | 2 +-
.../core/protocol/openwire/OpenwireMessage.java | 4 +--
.../journal/AbstractJournalStorageManager.java | 25 +++++++++++++--
.../impl/journal/codec/LargeMessagePersister.java | 2 +-
.../core/impl/CoreProtocolManagerFactory.java | 3 +-
.../spi/core/protocol/MessagePersister.java | 15 ++++-----
.../spi/core/protocol/ProtocolManagerFactory.java | 3 +-
.../server/impl/ScheduledDeliveryHandlerTest.java | 4 +--
.../tests/integration/client/AcknowledgeTest.java | 4 +--
.../replication/SharedNothingReplicationTest.java | 7 +++--
26 files changed, 168 insertions(+), 83 deletions(-)
diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile
index b50cf28..2ce58f2 100644
--- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile
+++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile
@@ -33,7 +33,7 @@ ARTEMIS_INSTANCE_ETC_URI='${artemis.instance.etc.uri}'
# Java Opts
if [ -z "$JAVA_ARGS" ]; then
- JAVA_ARGS="${java-opts} -XX:+PrintClassHistogram -XX:+UseG1GC -Xms512M -Xmx2G -Dhawtio.realm=activemq -Dhawtio.offline=true -Dhawtio.role=${role} -Dhawtio.rolePrincipalClasses=org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal -Djolokia.policyLocation=${ARTEMIS_INSTANCE_ETC_URI}jolokia-access.xml"
+ JAVA_ARGS="${java-opts} -XX:+PrintClassHistogram -XX:+UseG1GC -XX:+UseStringDeduplication -Xms512M -Xmx2G -Dhawtio.realm=activemq -Dhawtio.offline=true -Dhawtio.role=${role} -Dhawtio.rolePrincipalClasses=org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal -Djolokia.policyLocation=${ARTEMIS_INSTANCE_ETC_URI}jolokia-access.xml"
fi
#
diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile.cmd b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile.cmd
index 6778eba..f653b27 100644
--- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile.cmd
+++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile.cmd
@@ -33,7 +33,7 @@ rem Cluster Properties: Used to pass arguments to ActiveMQ Artemis which can be
rem set ARTEMIS_CLUSTER_PROPS=-Dactivemq.remoting.default.port=61617 -Dactivemq.remoting.amqp.port=5673 -Dactivemq.remoting.stomp.port=61614 -Dactivemq.remoting.hornetq.port=5446
rem Java Opts
-IF "%JAVA_ARGS%"=="" (set JAVA_ARGS=${java-opts} -XX:+PrintClassHistogram -XX:+UseG1GC -Xms512M -Xmx1024M -Xbootclasspath/a:%ARTEMIS_HOME%\lib\${logmanager};%ARTEMIS_HOME%\lib\${wildfly-common} -Djava.security.auth.login.config=%ARTEMIS_ETC_DIR%\login.config -Dhawtio.offline=true -Dhawtio.realm=activemq -Dhawtio.role=${role} -Dhawtio.rolePrincipalClasses=org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal -Djolokia.policyLocation=%ARTEMIS_INSTANCE_ETC_URI%\jolokia-access.xml [...]
+IF "%JAVA_ARGS%"=="" (set JAVA_ARGS=${java-opts} -XX:+PrintClassHistogram -XX:+UseG1GC -XX:+UseStringDeduplication -Xms512M -Xmx1024M -Xbootclasspath/a:%ARTEMIS_HOME%\lib\${logmanager};%ARTEMIS_HOME%\lib\${wildfly-common} -Djava.security.auth.login.config=%ARTEMIS_ETC_DIR%\login.config -Dhawtio.offline=true -Dhawtio.realm=activemq -Dhawtio.role=${role} -Dhawtio.rolePrincipalClasses=org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal -Djolokia.policyLocation=%ARTEMIS_INSTANC [...]
rem Logs Safepoints JVM pauses: Uncomment to enable them
rem In addition to the traditional GC logs you could enable some JVM flags to know any meaningful and "hidden" pause that could
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
index 7767fdd..91d884b 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
@@ -560,12 +560,16 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
public static final class ByteBufSimpleStringPool extends AbstractByteBufPool<SimpleString> {
- private static final int UUID_LENGTH = 36;
+ public static final int DEFAULT_MAX_LENGTH = 36;
private final int maxLength;
public ByteBufSimpleStringPool() {
- this.maxLength = UUID_LENGTH;
+ this.maxLength = DEFAULT_MAX_LENGTH;
+ }
+
+ public ByteBufSimpleStringPool(final int capacity) {
+ this(capacity, DEFAULT_MAX_LENGTH);
}
public ByteBufSimpleStringPool(final int capacity, final int maxCharsLength) {
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/Persister.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/Persister.java
index 124dfcf..bf8df3f 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/Persister.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/persistence/Persister.java
@@ -19,20 +19,22 @@ package org.apache.activemq.artemis.core.persistence;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-public interface Persister<T extends Object> {
-
- /** This is to be used to store the protocol-id on Messages.
- * Messages are stored on their bare format.
- * The protocol manager will be responsible to code or decode messages.
- * The caveat here is that the first short-sized bytes need to be this constant. */
+public interface Persister<T extends Object, A> {
+
+ /**
+ * This is to be used to store the protocol-id on Messages.
+ * Messages are stored on their bare format.
+ * The protocol manager will be responsible to code or decode messages.
+ * The caveat here is that the first short-sized bytes need to be this constant.
+ */
default byte getID() {
- return (byte)0;
+ return (byte) 0;
}
int getEncodeSize(T record);
void encode(ActiveMQBuffer buffer, T record);
- T decode(ActiveMQBuffer buffer, T record);
+ T decode(ActiveMQBuffer buffer, A arg);
}
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUID.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUID.java
index 7d8e984..64ff432 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUID.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUID.java
@@ -98,6 +98,7 @@ public final class UUID {
* @param data 16 byte UUID contents
*/
public UUID(final int type, final byte[] data) {
+ assert data.length == 16;
mId = data;
// Type is multiplexed with time_hi:
mId[UUID.INDEX_TYPE] &= (byte) 0x0F;
@@ -108,6 +109,7 @@ public final class UUID {
}
private UUID(final byte[] data) {
+ assert data.length == 16;
mId = data;
}
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
index 223da17..df81c28 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
@@ -446,17 +446,21 @@ public class TypedProperties {
}
public synchronized void decode(final ByteBuf buffer,
- final TypedPropertiesDecoderPools keyValuePools) {
+ final TypedPropertiesDecoderPools keyValuePools,
+ boolean replaceExisting) {
byte b = buffer.readByte();
if (b == DataConstants.NULL) {
- properties = null;
- size = 0;
+ if (replaceExisting) {
+ properties = null;
+ size = 0;
+ }
} else {
int numHeaders = buffer.readInt();
-
- //optimize the case of no collisions to avoid any resize (it doubles the map size!!!) when load factor is reached
- properties = new HashMap<>(numHeaders, 1.0f);
- size = 0;
+ if (replaceExisting || properties == null) {
+ //optimize the case of no collisions to avoid any resize (it doubles the map size!!!) when load factor is reached
+ properties = new HashMap<>(numHeaders, 1.0f);
+ }
+ size = properties.size();
for (int i = 0; i < numHeaders; i++) {
final SimpleString key = SimpleString.readSimpleString(buffer, keyValuePools == null ? null : keyValuePools.getPropertyKeysPool());
@@ -529,6 +533,10 @@ public class TypedProperties {
}
}
+ public synchronized void decode(final ByteBuf buffer, final TypedPropertiesDecoderPools keyValuePools) {
+ decode(buffer, keyValuePools, true);
+ }
+
public void decode(final ByteBuf buffer) {
decode(buffer, null);
}
@@ -1029,12 +1037,16 @@ public class TypedProperties {
public static final class ByteBufStringValuePool extends AbstractByteBufPool<StringValue> {
- private static final int UUID_LENGTH = 36;
+ public static final int DEFAULT_MAX_LENGTH = 36;
private final int maxLength;
public ByteBufStringValuePool() {
- this.maxLength = UUID_LENGTH;
+ this.maxLength = DEFAULT_MAX_LENGTH;
+ }
+
+ public ByteBufStringValuePool(final int capacity) {
+ this(capacity, DEFAULT_MAX_LENGTH);
}
public ByteBufStringValuePool(final int capacity, final int maxCharsLength) {
@@ -1074,9 +1086,9 @@ public class TypedProperties {
this.propertyValuesPool = new TypedProperties.StringValue.ByteBufStringValuePool();
}
- public TypedPropertiesDecoderPools(int keyPoolCapacity, int valuePoolCapacity, int maxCharsLength) {
- this.propertyKeysPool = new SimpleString.ByteBufSimpleStringPool(keyPoolCapacity, maxCharsLength);
- this.propertyValuesPool = new TypedProperties.StringValue.ByteBufStringValuePool(valuePoolCapacity, maxCharsLength);
+ public TypedPropertiesDecoderPools(int keyPoolCapacity, int valuePoolCapacity) {
+ this.propertyKeysPool = new SimpleString.ByteBufSimpleStringPool(keyPoolCapacity);
+ this.propertyValuesPool = new TypedProperties.StringValue.ByteBufStringValuePool(valuePoolCapacity);
}
public SimpleString.ByteBufSimpleStringPool getPropertyKeysPool() {
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index 1de8150..09c2d39 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -394,7 +394,7 @@ public interface Message {
*/
Message setDurable(boolean durable);
- Persister<Message> getPersister();
+ Persister<Message, CoreMessageObjectPools> getPersister();
String getAddress();
@@ -454,7 +454,7 @@ public interface Message {
void persist(ActiveMQBuffer targetRecord);
- void reloadPersistence(ActiveMQBuffer record);
+ void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools);
default void releaseBuffer() {
ByteBuf buffer = getBuffer();
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index 9050550..b61b27e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -130,7 +130,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
}
@Override
- public Persister<Message> getPersister() {
+ public Persister<Message, CoreMessageObjectPools> getPersister() {
return CoreMessagePersister.getInstance();
}
@@ -646,11 +646,15 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
}
private void decode(boolean beforeAddress) {
+ decode(beforeAddress, coreMessageObjectPools);
+ }
+
+ private void decode(boolean beforeAddress, CoreMessageObjectPools pools) {
endOfBodyPosition = buffer.readInt();
buffer.skipBytes(endOfBodyPosition - BUFFER_HEADER_SPACE);
- decodeHeadersAndProperties(buffer, true);
+ decodeHeadersAndProperties(buffer, true, pools);
buffer.readerIndex(0);
validBuffer = true;
@@ -662,14 +666,14 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
}
public void decodeHeadersAndProperties(final ByteBuf buffer) {
- decodeHeadersAndProperties(buffer, false);
+ decodeHeadersAndProperties(buffer, false, coreMessageObjectPools);
}
- private void decodeHeadersAndProperties(final ByteBuf buffer, boolean lazyProperties) {
+ private void decodeHeadersAndProperties(final ByteBuf buffer, boolean lazyProperties, CoreMessageObjectPools pools) {
messageIDPosition = buffer.readerIndex();
messageID = buffer.readLong();
- address = SimpleString.readNullableSimpleString(buffer, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressDecoderPool());
+ address = SimpleString.readNullableSimpleString(buffer, pools == null ? null : pools.getAddressDecoderPool());
if (buffer.readByte() == DataConstants.NOT_NULL) {
byte[] bytes = new byte[16];
buffer.readBytes(bytes);
@@ -687,7 +691,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
propertiesLocation = buffer.readerIndex();
} else {
properties = new TypedProperties(INTERNAL_PROPERTY_NAMES_PREDICATE);
- properties.decode(buffer, coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesDecoderPools());
+ properties.decode(buffer, pools == null ? null : pools.getPropertiesDecoderPools());
}
}
@@ -1180,11 +1184,11 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
}
@Override
- public void reloadPersistence(ActiveMQBuffer record) {
+ public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools) {
int size = record.readInt();
initBuffer(size);
buffer.setIndex(0, 0).writeBytes(record.byteBuf(), size);
- decode(false);
+ decode(false, pools);
}
@Override
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java
index 4c56eac..62ee5ed 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java
@@ -25,14 +25,30 @@ import java.util.function.Supplier;
public class CoreMessageObjectPools {
- private Supplier<SimpleString.ByteBufSimpleStringPool> addressDecoderPool = Suppliers.memoize(SimpleString.ByteBufSimpleStringPool::new);
- private Supplier<TypedProperties.TypedPropertiesDecoderPools> propertiesDecoderPools = Suppliers.memoize(TypedProperties.TypedPropertiesDecoderPools::new);
+ private final Supplier<SimpleString.ByteBufSimpleStringPool> addressDecoderPool;
+ private final Supplier<TypedProperties.TypedPropertiesDecoderPools> propertiesDecoderPools;
- private Supplier<SimpleString.StringSimpleStringPool> groupIdStringSimpleStringPool = Suppliers.memoize(SimpleString.StringSimpleStringPool::new);
- private Supplier<SimpleString.StringSimpleStringPool> addressStringSimpleStringPool = Suppliers.memoize(SimpleString.StringSimpleStringPool::new);
- private Supplier<TypedProperties.TypedPropertiesStringSimpleStringPools> propertiesStringSimpleStringPools = Suppliers.memoize(TypedProperties.TypedPropertiesStringSimpleStringPools::new);
+ private final Supplier<SimpleString.StringSimpleStringPool> groupIdStringSimpleStringPool;
+ private final Supplier<SimpleString.StringSimpleStringPool> addressStringSimpleStringPool;
+ private final Supplier<TypedProperties.TypedPropertiesStringSimpleStringPools> propertiesStringSimpleStringPools;
+
+ public CoreMessageObjectPools(int addressPoolCapacity,
+ int groupIdCapacity,
+ int propertyKeyCapacity,
+ int propertyValueCapacity) {
+ addressDecoderPool = Suppliers.memoize(() -> new SimpleString.ByteBufSimpleStringPool(addressPoolCapacity));
+ propertiesDecoderPools = Suppliers.memoize(() -> new TypedProperties.TypedPropertiesDecoderPools(propertyKeyCapacity, propertyValueCapacity));
+ groupIdStringSimpleStringPool = Suppliers.memoize(() -> new SimpleString.StringSimpleStringPool(groupIdCapacity));
+ addressStringSimpleStringPool = Suppliers.memoize(() -> new SimpleString.StringSimpleStringPool(addressPoolCapacity));
+ propertiesStringSimpleStringPools = Suppliers.memoize(() -> new TypedProperties.TypedPropertiesStringSimpleStringPools(propertyKeyCapacity, propertyValueCapacity));
+ }
public CoreMessageObjectPools() {
+ addressDecoderPool = Suppliers.memoize(SimpleString.ByteBufSimpleStringPool::new);
+ propertiesDecoderPools = Suppliers.memoize(TypedProperties.TypedPropertiesDecoderPools::new);
+ groupIdStringSimpleStringPool = Suppliers.memoize(SimpleString.StringSimpleStringPool::new);
+ addressStringSimpleStringPool = Suppliers.memoize(SimpleString.StringSimpleStringPool::new);
+ propertiesStringSimpleStringPools = Suppliers.memoize(TypedProperties.TypedPropertiesStringSimpleStringPools::new);
}
public SimpleString.ByteBufSimpleStringPool getAddressDecoderPool() {
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessagePersister.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessagePersister.java
index cbd565d..3861d67 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessagePersister.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessagePersister.java
@@ -23,7 +23,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.utils.DataConstants;
-public class CoreMessagePersister implements Persister<Message> {
+public class CoreMessagePersister implements Persister<Message, CoreMessageObjectPools> {
public static final byte ID = 1;
private static CoreMessagePersister theInstance;
@@ -68,14 +68,18 @@ public class CoreMessagePersister implements Persister<Message> {
record.persist(buffer);
}
-
@Override
- public Message decode(ActiveMQBuffer buffer, Message record) {
+ public Message decode(ActiveMQBuffer buffer, CoreMessageObjectPools pool) {
// the caller must consume the first byte already, as that will be used to decide what persister (protocol) to use
long id = buffer.readLong();
- SimpleString address = buffer.readNullableSimpleString();
- record = new CoreMessage();
- record.reloadPersistence(buffer);
+ final SimpleString address;
+ if (pool == null) {
+ address = buffer.readNullableSimpleString();
+ } else {
+ address = SimpleString.readNullableSimpleString(buffer.byteBuf(), pool.getAddressDecoderPool());
+ }
+ CoreMessage record = new CoreMessage();
+ record.reloadPersistence(buffer, pool);
record.setMessageID(id);
record.setAddress(address);
return record;
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java
index e1dcf97..0f809ad 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java
@@ -248,7 +248,7 @@ public class MessageInternalImpl implements MessageInternal {
}
@Override
- public Persister<Message> getPersister() {
+ public Persister<Message, CoreMessageObjectPools> getPersister() {
throw new UnsupportedOperationException();
}
@@ -340,7 +340,7 @@ public class MessageInternalImpl implements MessageInternal {
}
@Override
- public void reloadPersistence(ActiveMQBuffer record) {
+ public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools) {
throw new UnsupportedOperationException();
}
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncoderPersister.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncoderPersister.java
index 8fc2a5aa..1e734d3 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncoderPersister.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncoderPersister.java
@@ -22,7 +22,7 @@ import org.apache.activemq.artemis.core.persistence.Persister;
/** This is a facade between the new Persister and the former EncodingSupport.
* Methods using the old interface will use this as a facade to provide the previous semantic. */
-public class EncoderPersister implements Persister<EncodingSupport> {
+public class EncoderPersister implements Persister<EncodingSupport, EncodingSupport> {
private static final EncoderPersister theInstance = new EncoderPersister();
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index fcfe10e..be758de 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -741,7 +741,7 @@ public class AMQPMessage extends RefCountMessage {
}
@Override
- public void reloadPersistence(ActiveMQBuffer record) {
+ public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools) {
int size = record.readInt();
byte[] recordArray = new byte[size];
record.readBytes(recordArray);
@@ -771,7 +771,7 @@ public class AMQPMessage extends RefCountMessage {
}
@Override
- public Persister<org.apache.activemq.artemis.api.core.Message> getPersister() {
+ public Persister<org.apache.activemq.artemis.api.core.Message, CoreMessageObjectPools> getPersister() {
return AMQPMessagePersisterV2.getInstance();
}
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java
index c688124..9ab0842 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java
@@ -20,6 +20,7 @@ package org.apache.activemq.artemis.protocol.amqp.broker;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
import org.apache.activemq.artemis.utils.DataConstants;
@@ -62,12 +63,17 @@ public class AMQPMessagePersister extends MessagePersister {
}
@Override
- public Message decode(ActiveMQBuffer buffer, Message record) {
+ public Message decode(ActiveMQBuffer buffer, CoreMessageObjectPools pool) {
long id = buffer.readLong();
long format = buffer.readLong();
- SimpleString address = buffer.readNullableSimpleString();
- record = new AMQPMessage(format);
- record.reloadPersistence(buffer);
+ final SimpleString address;
+ if (pool == null) {
+ address = buffer.readNullableSimpleString();
+ } else {
+ address = SimpleString.readNullableSimpleString(buffer.byteBuf(), pool.getAddressDecoderPool());
+ }
+ AMQPMessage record = new AMQPMessage(format);
+ record.reloadPersistence(buffer, pool);
record.setMessageID(id);
if (address != null) {
record.setAddress(address);
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersisterV2.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersisterV2.java
index c268694..d263fe9 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersisterV2.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersisterV2.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.protocol.amqp.broker;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
@@ -68,16 +69,24 @@ public class AMQPMessagePersisterV2 extends AMQPMessagePersister {
}
}
-
@Override
- public Message decode(ActiveMQBuffer buffer, Message record) {
- AMQPMessage message = (AMQPMessage)super.decode(buffer, record);
+ public Message decode(ActiveMQBuffer buffer, CoreMessageObjectPools pool) {
+ AMQPMessage message = (AMQPMessage) super.decode(buffer, pool);
int size = buffer.readInt();
if (size != 0) {
- TypedProperties properties = new TypedProperties(Message.INTERNAL_PROPERTY_NAMES_PREDICATE);
- properties.decode(buffer.byteBuf());
- message.setExtraProperties(properties);
+ // message::setAddress could have populated extra properties
+ // hence, we can safely replace the value on the properties
+ // if it has been encoded differently in the rest of the buffer
+ TypedProperties existingExtraProperties = message.getExtraProperties();
+ TypedProperties extraProperties = existingExtraProperties;
+ if (existingExtraProperties == null) {
+ extraProperties = new TypedProperties(Message.INTERNAL_PROPERTY_NAMES_PREDICATE);
+ }
+ extraProperties.decode(buffer.byteBuf(), pool != null ? pool.getPropertiesDecoderPools() : null, existingExtraProperties == null);
+ if (extraProperties != existingExtraProperties) {
+ message.setExtraProperties(extraProperties);
+ }
}
return message;
}
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java
index cd21e46..7470d60 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java
@@ -21,6 +21,7 @@ import java.util.Map;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory;
@@ -39,7 +40,7 @@ public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory
private static String[] SUPPORTED_PROTOCOLS = {AMQP_PROTOCOL_NAME};
@Override
- public Persister<Message>[] getPersister() {
+ public Persister<Message, CoreMessageObjectPools>[] getPersister() {
Persister[] persisters = new Persister[]{AMQPMessagePersister.getInstance(), AMQPMessagePersisterV2.getInstance()};
return persisters;
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
index 88414b4..695bf0f 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
@@ -147,7 +147,7 @@ public class AMQPMessageTest {
final long persistedSize = (long) encoded.readableBytes();
// Now reload from encoded data
- message.reloadPersistence(encoded);
+ message.reloadPersistence(encoded, null);
assertEquals(persistedSize, message.getPersistSize());
assertEquals(persistedSize - Integer.BYTES, message.getPersistentSize());
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
index 45e8953..9644b70 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
@@ -140,7 +140,7 @@ public class OpenwireMessage implements Message {
}
@Override
- public Persister<Message> getPersister() {
+ public Persister<Message, CoreMessageObjectPools> getPersister() {
return null;
}
@@ -205,7 +205,7 @@ public class OpenwireMessage implements Message {
}
@Override
- public void reloadPersistence(ActiveMQBuffer record) {
+ public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools) {
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index effb7e1..296b221 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.core.persistence.impl.journal;
+import static org.apache.activemq.artemis.api.core.SimpleString.ByteBufSimpleStringPool.DEFAULT_MAX_LENGTH;
+import static org.apache.activemq.artemis.api.core.SimpleString.ByteBufSimpleStringPool.DEFAULT_POOL_CAPACITY;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_CURSOR;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE_PENDING;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.DUPLICATE_ID;
@@ -56,6 +58,7 @@ import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
@@ -858,6 +861,19 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
}
final MutableLong recordNumber = new MutableLong();
+ final CoreMessageObjectPools pools;
+ if (totalSize > 0) {
+ final int addresses = (int)Math.max(
+ DEFAULT_POOL_CAPACITY,
+ queueInfos == null ? 0 :
+ queueInfos.values().stream()
+ .map(QueueBindingInfo::getAddress)
+ .filter(addr -> addr.length() <= DEFAULT_MAX_LENGTH)
+ .count() * 2);
+ pools = new CoreMessageObjectPools(addresses, DEFAULT_POOL_CAPACITY, 128, 128);
+ } else {
+ pools = null;
+ }
// This will free up memory sooner while reading the records
records.clear(record -> {
try {
@@ -904,7 +920,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
case JournalRecordIds.ADD_MESSAGE_PROTOCOL: {
- Message message = MessagePersister.getInstance().decode(buff, null);
+ Message message = MessagePersister.getInstance().decode(buff, pools);
messages.put(record.id, message);
@@ -1716,6 +1732,8 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
final Set<Pair<Long, Long>> pendingLargeMessages,
JournalLoader journalLoader) throws Exception {
// recover prepared transactions
+ CoreMessageObjectPools pools = null;
+
for (PreparedTransactionInfo preparedTransaction : preparedTransactions) {
XidEncoding encodingXid = new XidEncoding(preparedTransaction.getExtraData());
@@ -1749,7 +1767,10 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
break;
}
case JournalRecordIds.ADD_MESSAGE_PROTOCOL: {
- Message message = MessagePersister.getInstance().decode(buff, null);
+ if (pools == null) {
+ pools = new CoreMessageObjectPools();
+ }
+ Message message = MessagePersister.getInstance().decode(buff, pools);
messages.put(record.id, message);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/LargeMessagePersister.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/LargeMessagePersister.java
index b715f97..4e02f68 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/LargeMessagePersister.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/LargeMessagePersister.java
@@ -22,7 +22,7 @@ import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
-public class LargeMessagePersister implements Persister<LargeServerMessage> {
+public class LargeMessagePersister implements Persister<LargeServerMessage, LargeServerMessage> {
private static final LargeMessagePersister theInstance = new LargeMessagePersister();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java
index 7590924..6d5e352 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java
@@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -37,7 +38,7 @@ public class CoreProtocolManagerFactory extends AbstractProtocolManagerFactory<I
private static final String MODULE_NAME = "artemis-server";
@Override
- public Persister<Message>[] getPersister() {
+ public Persister<Message, CoreMessageObjectPools>[] getPersister() {
return new Persister[]{CoreMessagePersister.getInstance()};
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java
index ad1317f..2fddc56 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java
@@ -21,11 +21,12 @@ import java.util.ServiceLoader;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.jboss.logging.Logger;
-public class MessagePersister implements Persister<Message> {
+public class MessagePersister implements Persister<Message, CoreMessageObjectPools> {
private static final Logger logger = Logger.getLogger(MessagePersister.class);
@@ -33,7 +34,7 @@ public class MessagePersister implements Persister<Message> {
/** This will be used for reading messages */
private static final int MAX_PERSISTERS = 3;
- private static final Persister<Message>[] persisters = new Persister[MAX_PERSISTERS];
+ private static final Persister<Message, CoreMessageObjectPools>[] persisters = new Persister[MAX_PERSISTERS];
static {
CoreMessagePersister persister = CoreMessagePersister.getInstance();
@@ -46,7 +47,7 @@ public class MessagePersister implements Persister<Message> {
}
public static void registerProtocol(ProtocolManagerFactory manager) {
- Persister<Message>[] messagePersisters = manager.getPersister();
+ Persister<Message, CoreMessageObjectPools>[] messagePersisters = manager.getPersister();
if (messagePersisters == null || messagePersisters.length == 0) {
logger.debug("Cannot find persister for " + manager);
} else {
@@ -69,7 +70,7 @@ public class MessagePersister implements Persister<Message> {
return persisters[id - 1];
}
- public static void registerPersister(Persister<Message> persister) {
+ public static void registerPersister(Persister<Message, CoreMessageObjectPools> persister) {
if (persister != null) {
assert persister.getID() <= MAX_PERSISTERS : "You must update MessagePersister::MAX_PERSISTERS to a higher number";
persisters[persister.getID() - 1] = persister;
@@ -97,12 +98,12 @@ public class MessagePersister implements Persister<Message> {
}
@Override
- public Message decode(ActiveMQBuffer buffer, Message record) {
+ public Message decode(ActiveMQBuffer buffer, CoreMessageObjectPools pools) {
byte protocol = buffer.readByte();
- Persister<Message> persister = getPersister(protocol);
+ Persister<Message, CoreMessageObjectPools> persister = getPersister(protocol);
if (persister == null) {
throw new NullPointerException("couldn't find factory for type=" + protocol);
}
- return persister.decode(buffer, record);
+ return persister.decode(buffer, pools);
}
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java
index 4ab34eb..77c66f9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java
@@ -21,12 +21,13 @@ import java.util.Map;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
public interface ProtocolManagerFactory<P extends BaseInterceptor> {
- default Persister<Message>[] getPersister() {
+ default Persister<Message, CoreMessageObjectPools>[] getPersister() {
return new Persister[]{};
}
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index b3ae240..d0ed6f8 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -323,12 +323,12 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
- public void reloadPersistence(ActiveMQBuffer record) {
+ public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools) {
}
@Override
- public Persister<Message> getPersister() {
+ public Persister<Message, CoreMessageObjectPools> getPersister() {
return null;
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
index 511d476..dc23bc9 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
@@ -371,12 +371,12 @@ public class AcknowledgeTest extends ActiveMQTestBase {
}
@Override
- public Persister<Message> getPersister() {
+ public Persister<Message, CoreMessageObjectPools> getPersister() {
return null;
}
@Override
- public void reloadPersistence(ActiveMQBuffer record) {
+ public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools) {
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationTest.java
index 7cd4b05..f05552b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationTest.java
@@ -39,6 +39,7 @@ import org.apache.activemq.artemis.core.journal.LoaderCallback;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
@@ -294,7 +295,7 @@ public class SharedNothingReplicationTest extends ActiveMQTestBase {
return conf;
}
- static class SlowMessagePersister extends CoreMessagePersister implements Persister<Message> {
+ static class SlowMessagePersister extends CoreMessagePersister implements Persister<Message, CoreMessageObjectPools> {
boolean used = false;
@@ -343,8 +344,8 @@ public class SharedNothingReplicationTest extends ActiveMQTestBase {
}
@Override
- public Message decode(ActiveMQBuffer buffer, Message record) {
- return persister.decode(buffer, record);
+ public Message decode(ActiveMQBuffer buffer, CoreMessageObjectPools pool) {
+ return persister.decode(buffer, pool);
}
}