You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2021/02/24 15:50:23 UTC
[activemq-artemis] branch master updated: ARTEMIS-3067,
ARTEMIS-3135 - rework accounting for lazy decoding by directly
referencing the owning page store,
owner now tracked on a message rather than the message reference. This
avoids the error prone checks around potential decoding sites
This is an automated email from the ASF dual-hosted git repository.
gtully pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new a0ce381 ARTEMIS-3067, ARTEMIS-3135 - rework accounting for lazy decoding by directly referencing the owning page store, owner now tracked on a message rather than the message reference. This avoids the error prone checks around potential decoding sites
a0ce381 is described below
commit a0ce3812bae68fdee2354fc265960b63ccc6e08e
Author: gtully <ga...@gmail.com>
AuthorDate: Tue Feb 23 21:11:48 2021 +0000
ARTEMIS-3067, ARTEMIS-3135 - rework accounting for lazy decoding by directly referencing the owning page store, owner now tracked on a message rather than the message reference. This avoids the error prone checks around potential decoding sites
---
.../apache/activemq/artemis/api/core/Message.java | 3 ++
.../artemis/core/message/impl/CoreMessage.java | 12 ++++++++
.../core/message/impl/MessageInternalImpl.java | 10 +++++++
.../artemis/protocol/amqp/broker/AMQPMessage.java | 33 +++++++++++++++++++++-
.../protocol/amqp/broker/AMQPStandardMessage.java | 13 +--------
.../connect/mirror/AMQPMirrorControllerSource.java | 10 +------
.../core/protocol/openwire/OpenwireMessage.java | 9 ++++++
.../core/management/impl/QueueControlImpl.java | 7 -----
.../core/paging/cursor/PagedReferenceImpl.java | 11 --------
.../core/postoffice/impl/PostOfficeImpl.java | 7 +++--
.../artemis/core/server/MessageReference.java | 8 ++----
.../server/impl/GroupFirstMessageReference.java | 9 ------
.../artemis/core/server/impl/LastValueQueue.java | 10 -------
.../core/server/impl/MessageReferenceImpl.java | 24 +---------------
.../artemis/core/server/impl/QueueImpl.java | 17 +++--------
.../core/server/impl/ServerConsumerImpl.java | 2 +-
.../server/impl/ScheduledDeliveryHandlerTest.java | 13 +++++++--
.../artemis/tests/util/ActiveMQTestBase.java | 2 +-
.../tests/integration/client/AcknowledgeTest.java | 9 ++++++
.../tests/integration/paging/GlobalPagingTest.java | 2 +-
.../core/server/impl/QueueConcurrentTest.java | 2 +-
21 files changed, 103 insertions(+), 110 deletions(-)
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index 56a7cfc..3d65ab7 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -758,4 +758,7 @@ public interface Message {
*/
long getPersistentSize() throws ActiveMQException;
+ Object getOwner();
+
+ void setOwner(Object object);
}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index 781e59e..8ae9543 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -101,6 +101,8 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
private final CoreMessageObjectPools coreMessageObjectPools;
+ private volatile Object owner;
+
public CoreMessage(final CoreMessageObjectPools coreMessageObjectPools) {
this.coreMessageObjectPools = coreMessageObjectPools;
}
@@ -1259,4 +1261,14 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
public long getPersistentSize() throws ActiveMQException {
return getEncodeSize();
}
+
+ @Override
+ public Object getOwner() {
+ return owner;
+ }
+
+ @Override
+ public void setOwner(Object object) {
+ this.owner = object;
+ }
}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java
index 1095609..936e093 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java
@@ -709,4 +709,14 @@ public class MessageInternalImpl implements MessageInternal {
return message.getPersistentSize();
}
+ @Override
+ public Object getOwner() {
+ return message.getOwner();
+ }
+
+ @Override
+ public void setOwner(Object object) {
+ message.setOwner(object);
+ }
+
}
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 4740166..25e983c 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -34,6 +34,7 @@ import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.RefCountMessage;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.server.MessageReference;
@@ -213,6 +214,8 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
// These are properties set at the broker level and carried only internally by broker storage.
protected volatile TypedProperties extraProperties;
+ private volatile Object owner;
+
/**
* Creates a new {@link AMQPMessage} instance from binary encoded message data.
*
@@ -490,12 +493,30 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
protected ApplicationProperties lazyDecodeApplicationProperties(ReadableBuffer data) {
if (applicationProperties == null && applicationPropertiesPosition != VALUE_NOT_PRESENT) {
applicationProperties = scanForMessageSection(data, applicationPropertiesPosition, ApplicationProperties.class);
- memoryEstimate = -1;
+ if (owner != null && memoryEstimate != -1) {
+ // the memory has already been tracked and needs to be updated to reflect the new decoding
+ int addition = unmarshalledApplicationPropertiesMemoryEstimateFromData(data);
+ ((PagingStore)owner).addSize(addition);
+ final int updatedEstimate = memoryEstimate + addition;
+ memoryEstimate = updatedEstimate;
+ }
}
return applicationProperties;
}
+ protected int unmarshalledApplicationPropertiesMemoryEstimateFromData(ReadableBuffer data) {
+ if (applicationProperties != null) {
+ // they have been unmarshalled, estimate memory usage based on their encoded size
+ if (remainingBodyPosition != VALUE_NOT_PRESENT) {
+ return remainingBodyPosition - applicationPropertiesPosition;
+ } else {
+ return data.capacity() - applicationPropertiesPosition;
+ }
+ }
+ return 0;
+ }
+
@SuppressWarnings("unchecked")
protected Map<String, Object> getApplicationPropertiesMap(boolean createIfAbsent) {
ApplicationProperties appMap = lazyDecodeApplicationProperties();
@@ -1692,4 +1713,14 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
protected SimpleString.StringSimpleStringPool getPropertyValuesPool() {
return coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesStringSimpleStringPools().getPropertyValuesPool();
}
+
+ @Override
+ public Object getOwner() {
+ return owner;
+ }
+
+ @Override
+ public void setOwner(Object object) {
+ this.owner = object;
+ }
}
\ No newline at end of file
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java
index ce2e374..80358aa 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java
@@ -187,23 +187,12 @@ public class AMQPStandardMessage extends AMQPMessage {
@Override
public int getMemoryEstimate() {
if (memoryEstimate == -1) {
- memoryEstimate = memoryOffset + (data != null ? data.capacity() + unmarshalledApplicationPropertiesMemoryEstimateFromData() : 0);
+ memoryEstimate = memoryOffset + (data != null ? data.capacity() + unmarshalledApplicationPropertiesMemoryEstimateFromData(data) : 0);
}
return memoryEstimate;
}
- private int unmarshalledApplicationPropertiesMemoryEstimateFromData() {
- if (applicationProperties != null) {
- // they have been unmarshalled, estimate memory usage based on their encoded size
- if (remainingBodyPosition != VALUE_NOT_PRESENT) {
- return remainingBodyPosition - applicationPropertiesPosition;
- } else {
- return data.capacity() - applicationPropertiesPosition;
- }
- }
- return 0;
- }
@Override
public void persist(ActiveMQBuffer targetRecord) {
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
index bb44830..86dda6d 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
@@ -23,7 +23,6 @@ import java.util.Map;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
@@ -153,15 +152,8 @@ public class AMQPMirrorControllerSource implements MirrorController, ActiveMQCom
try {
context.setReusable(false);
- PagingStore storeOwner = null;
- if (refs.size() > 0) {
- storeOwner = refs.get(0).getOwner();
- }
- if (storeOwner != null && !storeOwner.getAddress().equals(message.getAddressSimpleString())) {
- storeOwner = server.getPagingManager().getPageStore(message.getAddressSimpleString());
- }
- MessageReference ref = MessageReference.Factory.createReference(message, snfQueue, storeOwner);
+ MessageReference ref = MessageReference.Factory.createReference(message, snfQueue);
snfQueue.refUp(ref);
Map<Symbol, Object> daMap = new HashMap<>();
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
index 0279b5b..1d88a40 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
@@ -499,6 +499,15 @@ public class OpenwireMessage implements Message {
}
@Override
+ public Object getOwner() {
+ return null;
+ }
+
+ @Override
+ public void setOwner(Object object) {
+ }
+
+ @Override
public int getUsage() {
return 0;
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index cec65b2..5fb5a7d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -51,7 +51,6 @@ import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
import org.apache.activemq.artemis.core.server.impl.RefsOperation;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@@ -793,9 +792,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
int i = 0;
for (MessageReference ref : refs) {
Message message = ref.getMessage();
- final int currentMemoryEstimate = message.getMemoryEstimate();
messages[i++] = message.toMap();
- MessageReferenceImpl.accountForChangeInMemoryEstimate(ref, currentMemoryEstimate);
}
return messages;
}
@@ -856,9 +853,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
MessageReference ref = iterator.next();
if (filter == null || filter.match(ref.getMessage())) {
Message message = ref.getMessage();
- final int currentMemoryEstimate = message.getMemoryEstimate();
messages.add(message.toMap());
- MessageReferenceImpl.accountForChangeInMemoryEstimate(ref, currentMemoryEstimate);
}
}
} catch (NoSuchElementException ignored) {
@@ -903,9 +898,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
if (iterator.hasNext()) {
MessageReference ref = iterator.next();
Message message = ref.getMessage();
- final int currentMemoryEstimate = message.getMemoryEstimate();
messages.add(message.toMap());
- MessageReferenceImpl.accountForChangeInMemoryEstimate(ref, currentMemoryEstimate);
}
return messages.toArray(new Map[1]);
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
index 27e6167..76f5a05 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
@@ -23,7 +23,6 @@ import java.util.function.Consumer;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.PagedMessage;
-import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
@@ -407,16 +406,6 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
}
@Override
- public PagingStore getOwner() {
- return null;
- }
-
- @Override
- public void setOwner(PagingStore owner) {
-
- }
-
- @Override
public boolean isDurable() {
if (durable == UNDEFINED_IS_DURABLE) {
durable = getMessage().isDurable() ? IS_DURABLE : IS_NOT_DURABLE;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index e20673e..b027c27 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1244,7 +1244,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
@Override
public MessageReference reload(final Message message, final Queue queue, final Transaction tx) throws Exception {
- MessageReference reference = MessageReference.Factory.createReference(message, queue, pagingManager.getPageStore(message.getAddressSimpleString()));
+ MessageReference reference = MessageReference.Factory.createReference(message, queue);
Long scheduledDeliveryTime;
if (message.hasScheduledDeliveryTime()) {
@@ -1499,6 +1499,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
PagingStore owningStore = pagingManager.getPageStore(message.getAddressSimpleString());
+ message.setOwner(owningStore);
for (Map.Entry<SimpleString, RouteContextList> entry : context.getContexListing().entrySet()) {
PagingStore store;
if (entry.getKey() == message.getAddressSimpleString() || entry.getKey().equals(message.getAddressSimpleString())) {
@@ -1518,7 +1519,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
for (Queue queue : entry.getValue().getNonDurableQueues()) {
- MessageReference reference = MessageReference.Factory.createReference(message, queue, owningStore);
+ MessageReference reference = MessageReference.Factory.createReference(message, queue);
if (deliveryTime != null) {
reference.setScheduledDeliveryTime(deliveryTime);
@@ -1533,7 +1534,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
while (iter.hasNext()) {
Queue queue = iter.next();
- MessageReference reference = MessageReference.Factory.createReference(message, queue, owningStore);
+ MessageReference reference = MessageReference.Factory.createReference(message, queue);
if (context.isAlreadyAcked(context.getAddress(message), queue)) {
reference.setAlreadyAcked();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
index 6765e4f..4594fcd 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
@@ -22,7 +22,6 @@ import java.util.function.Consumer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -35,8 +34,8 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
public interface MessageReference {
final class Factory {
- public static MessageReference createReference(Message encode, final Queue queue, PagingStore pageStore) {
- return new MessageReferenceImpl(encode, queue, pageStore);
+ public static MessageReference createReference(Message encode, final Queue queue) {
+ return new MessageReferenceImpl(encode, queue);
}
}
boolean isPaged();
@@ -138,7 +137,4 @@ public interface MessageReference {
*/
long getPersistentSize() throws ActiveMQException;
- PagingStore getOwner();
-
- void setOwner(PagingStore owner);
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/GroupFirstMessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/GroupFirstMessageReference.java
index 6e2f1fc..c0b0e02 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/GroupFirstMessageReference.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/GroupFirstMessageReference.java
@@ -218,13 +218,4 @@ public class GroupFirstMessageReference implements MessageReference {
return messageReference.getPersistentSize();
}
- @Override
- public PagingStore getOwner() {
- return this.owner;
- }
-
- @Override
- public void setOwner(PagingStore owner) {
- this.owner = owner;
- }
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index 02031e7..4305585 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -570,16 +570,6 @@ public class LastValueQueue extends QueueImpl {
return new StringBuilder().append("HolderReference").append("@").append(Integer.toHexString(System.identityHashCode(this))).append("[ref=").append(ref).append("]").toString();
}
- @Override
- public PagingStore getOwner() {
- return ref.getOwner();
- }
-
- @Override
- public void setOwner(PagingStore owner) {
-
- ref.setOwner(owner);
- }
}
@Override
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
index dfbeb87..4938a1b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
@@ -23,7 +23,6 @@ import java.util.function.Consumer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
@@ -36,7 +35,6 @@ import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceImpl> implements MessageReference, Runnable {
private static final MessageReferenceComparatorByID idComparator = new MessageReferenceComparatorByID();
- private volatile PagingStore owner;
public static Comparator<MessageReference> getIDComparator() {
return idComparator;
@@ -107,15 +105,13 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
this.queue = queue;
- this.owner = other.owner;
}
- public MessageReferenceImpl(final Message message, final Queue queue, final PagingStore owner) {
+ public MessageReferenceImpl(final Message message, final Queue queue) {
this.message = message;
this.queue = queue;
- this.owner = owner;
}
// MessageReference implementation -------------------------------
@@ -179,15 +175,6 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
return MessageReferenceImpl.memoryOffset;
}
- public static void accountForChangeInMemoryEstimate(final MessageReference ref, final int existingMemoryEstimate) {
- final int delta = ref.getMessageMemoryEstimate() - existingMemoryEstimate;
- if (delta > 0) {
- PagingStore pageStore = ref.getOwner();
- if (pageStore != null) {
- pageStore.addSize(delta);
- }
- }
- }
@Override
public int getDeliveryCount() {
@@ -367,13 +354,4 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
return this.getMessage().getPersistentSize();
}
- @Override
- public PagingStore getOwner() {
- return this.owner;
- }
-
- @Override
- public void setOwner(PagingStore owner) {
- this.owner = owner;
- }
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 61f1475..53eaafb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1005,8 +1005,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
public void refUp(MessageReference messageReference) {
int count = messageReference.getMessage().refUp();
if (count == 1) {
- if (messageReference.getOwner() != null) {
- messageReference.getOwner().addSize(messageReference.getMessageMemoryEstimate());
+ if (messageReference.getMessage().getOwner() != null) {
+ ((PagingStore)messageReference.getMessage().getOwner()).addSize(messageReference.getMessageMemoryEstimate());
}
}
if (pagingStore != null) {
@@ -1018,8 +1018,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
public void refDown(MessageReference messageReference) {
int count = messageReference.getMessage().refDown();
if (count == 0) {
- if (messageReference.getOwner() != null) {
- messageReference.getOwner().addSize(-messageReference.getMessageMemoryEstimate());
+ if (messageReference.getMessage().getOwner() != null) {
+ ((PagingStore)messageReference.getMessage().getOwner()).addSize(-messageReference.getMessageMemoryEstimate());
}
}
if (pagingStore != null) {
@@ -3071,9 +3071,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
proceedDeliver(handledconsumer, ref);
}
- if (existingMemoryEstimate > 0 ) {
- MessageReferenceImpl.accountForChangeInMemoryEstimate(ref, existingMemoryEstimate);
- }
}
return true;
@@ -3697,13 +3694,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
consumer = groupConsumer;
}
- // filter evaluation may cause properties to be lazyDecoded
- final int existingMemoryEstimate = ref.getMessageMemoryEstimate();
-
HandleStatus status = handle(ref, consumer);
-
- MessageReferenceImpl.accountForChangeInMemoryEstimate(ref, existingMemoryEstimate);
-
if (status == HandleStatus.HANDLED) {
final MessageReference reference;
if (redistributor == null) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index f7dbf9a..10b131a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -644,7 +644,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
public void forceDelivery(final long sequence) {
forceDelivery(sequence, () -> {
Message forcedDeliveryMessage = new CoreMessage(storageManager.generateID(), 50);
- MessageReference reference = MessageReference.Factory.createReference(forcedDeliveryMessage, messageQueue, null);
+ MessageReference reference = MessageReference.Factory.createReference(forcedDeliveryMessage, messageQueue);
reference.setDeliveryCount(0);
forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 0fcd2b4..3a150c7 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -251,7 +251,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
long nextMessageID,
long nextScheduledTime,
boolean tail) {
- MessageReferenceImpl refImpl = new MessageReferenceImpl(new FakeMessage(nextMessageID), null, null);
+ MessageReferenceImpl refImpl = new MessageReferenceImpl(new FakeMessage(nextMessageID), null);
refImpl.setScheduledDeliveryTime(nextScheduledTime);
handler.addInPlace(nextScheduledTime, refImpl, tail);
}
@@ -261,7 +261,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
long nextScheduledTime,
boolean tail,
Queue queue) {
- MessageReferenceImpl refImpl = new MessageReferenceImpl(new FakeMessage(nextMessageID), queue, null);
+ MessageReferenceImpl refImpl = new MessageReferenceImpl(new FakeMessage(nextMessageID), queue);
refImpl.setScheduledDeliveryTime(nextScheduledTime);
handler.checkAndSchedule(refImpl, tail);
}
@@ -810,6 +810,15 @@ public class ScheduledDeliveryHandlerTest extends Assert {
return 0;
}
+ @Override
+ public Object getOwner() {
+ return null;
+ }
+
+ @Override
+ public void setOwner(Object object) {
+ }
+
}
public class FakeQueueForScheduleUnitTest extends CriticalComponentImpl implements Queue {
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index 85dd47f..adfceee 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -2150,7 +2150,7 @@ public abstract class ActiveMQTestBase extends Assert {
protected MessageReference generateReference(final Queue queue, final long id) {
Message message = generateMessage(id);
- return MessageReference.Factory.createReference(message, queue, null);
+ return MessageReference.Factory.createReference(message, queue);
}
protected int calculateRecordSize(final int size, final int alignment) {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
index 310d03d..6b6eb10 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
@@ -839,5 +839,14 @@ public class AcknowledgeTest extends ActiveMQTestBase {
public long getPersistentSize() throws ActiveMQException {
return 0;
}
+
+ @Override
+ public Object getOwner() {
+ return null;
+ }
+
+ @Override
+ public void setOwner(Object object) {
+ }
}
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java
index 97c941e..3dee2b2 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java
@@ -334,7 +334,7 @@ public class GlobalPagingTest extends PagingTest {
int id = 1000;
try (ClientConsumer consumer = session.createConsumer(replyQueue)) {
final Queue queue = server.locateQueue(replyQueue);
- final MessageReference reference = MessageReference.Factory.createReference(session.createMessage(false), queue, null);
+ final MessageReference reference = MessageReference.Factory.createReference(session.createMessage(false), queue);
reference.getMessage().setMessageID(id++);
//it will cause QueueImpl::directDeliver -> false
queue.addHead(reference, false);
diff --git a/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueConcurrentTest.java b/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueConcurrentTest.java
index 3cf0cae..6d73cfd 100644
--- a/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueConcurrentTest.java
+++ b/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueConcurrentTest.java
@@ -140,7 +140,7 @@ public class QueueConcurrentTest extends ActiveMQTestBase {
while (System.currentTimeMillis() - start < testTime) {
Message message = generateMessage(i);
- MessageReference ref = MessageReference.Factory.createReference(message, queue, null);
+ MessageReference ref = MessageReference.Factory.createReference(message, queue);
queue.addTail(ref, false);