You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2021/02/24 15:50:23 UTC

[activemq-artemis] branch master updated: ARTEMIS-3067, ARTEMIS-3135 - rework accounting for lazy decoding by directly referencing the owning page store, owner now tracked on a message rather than the message reference. This avoids the error prone checks around potential decoding sites

This is an automated email from the ASF dual-hosted git repository.

gtully pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new a0ce381  ARTEMIS-3067, ARTEMIS-3135 - rework accounting for lazy decoding by directly referencing the owning page store, owner now tracked on a message rather than the message reference. This avoids the error prone checks around potential decoding sites
a0ce381 is described below

commit a0ce3812bae68fdee2354fc265960b63ccc6e08e
Author: gtully <ga...@gmail.com>
AuthorDate: Tue Feb 23 21:11:48 2021 +0000

    ARTEMIS-3067, ARTEMIS-3135 - rework accounting for lazy decoding by directly referencing the owning page store, owner now tracked on a message rather than the message reference. This avoids the error prone checks around potential decoding sites
---
 .../apache/activemq/artemis/api/core/Message.java  |  3 ++
 .../artemis/core/message/impl/CoreMessage.java     | 12 ++++++++
 .../core/message/impl/MessageInternalImpl.java     | 10 +++++++
 .../artemis/protocol/amqp/broker/AMQPMessage.java  | 33 +++++++++++++++++++++-
 .../protocol/amqp/broker/AMQPStandardMessage.java  | 13 +--------
 .../connect/mirror/AMQPMirrorControllerSource.java | 10 +------
 .../core/protocol/openwire/OpenwireMessage.java    |  9 ++++++
 .../core/management/impl/QueueControlImpl.java     |  7 -----
 .../core/paging/cursor/PagedReferenceImpl.java     | 11 --------
 .../core/postoffice/impl/PostOfficeImpl.java       |  7 +++--
 .../artemis/core/server/MessageReference.java      |  8 ++----
 .../server/impl/GroupFirstMessageReference.java    |  9 ------
 .../artemis/core/server/impl/LastValueQueue.java   | 10 -------
 .../core/server/impl/MessageReferenceImpl.java     | 24 +---------------
 .../artemis/core/server/impl/QueueImpl.java        | 17 +++--------
 .../core/server/impl/ServerConsumerImpl.java       |  2 +-
 .../server/impl/ScheduledDeliveryHandlerTest.java  | 13 +++++++--
 .../artemis/tests/util/ActiveMQTestBase.java       |  2 +-
 .../tests/integration/client/AcknowledgeTest.java  |  9 ++++++
 .../tests/integration/paging/GlobalPagingTest.java |  2 +-
 .../core/server/impl/QueueConcurrentTest.java      |  2 +-
 21 files changed, 103 insertions(+), 110 deletions(-)

diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index 56a7cfc..3d65ab7 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -758,4 +758,7 @@ public interface Message {
     */
    long getPersistentSize() throws ActiveMQException;
 
+   Object getOwner();
+
+   void setOwner(Object object);
 }
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index 781e59e..8ae9543 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -101,6 +101,8 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
 
    private final CoreMessageObjectPools coreMessageObjectPools;
 
+   private volatile Object owner;
+
    public CoreMessage(final CoreMessageObjectPools coreMessageObjectPools) {
       this.coreMessageObjectPools = coreMessageObjectPools;
    }
@@ -1259,4 +1261,14 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
    public long getPersistentSize() throws ActiveMQException {
       return getEncodeSize();
    }
+
+   @Override
+   public Object getOwner() {
+      return owner;
+   }
+
+   @Override
+   public void setOwner(Object object) {
+      this.owner = object;
+   }
 }
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java
index 1095609..936e093 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java
@@ -709,4 +709,14 @@ public class MessageInternalImpl implements MessageInternal {
       return message.getPersistentSize();
    }
 
+   @Override
+   public Object getOwner() {
+      return message.getOwner();
+   }
+
+   @Override
+   public void setOwner(Object object) {
+      message.setOwner(object);
+   }
+
 }
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 4740166..25e983c 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -34,6 +34,7 @@ import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.RefCountMessage;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.server.MessageReference;
@@ -213,6 +214,8 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
    // These are properties set at the broker level and carried only internally by broker storage.
    protected volatile TypedProperties extraProperties;
 
+   private volatile Object owner;
+
    /**
     * Creates a new {@link AMQPMessage} instance from binary encoded message data.
     *
@@ -490,12 +493,30 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
    protected ApplicationProperties lazyDecodeApplicationProperties(ReadableBuffer data) {
       if (applicationProperties == null && applicationPropertiesPosition != VALUE_NOT_PRESENT) {
          applicationProperties = scanForMessageSection(data, applicationPropertiesPosition, ApplicationProperties.class);
-         memoryEstimate = -1;
+         if (owner != null && memoryEstimate != -1) {
+            // the memory has already been tracked and needs to be updated to reflect the new decoding
+            int addition = unmarshalledApplicationPropertiesMemoryEstimateFromData(data);
+            ((PagingStore)owner).addSize(addition);
+            final int updatedEstimate = memoryEstimate + addition;
+            memoryEstimate = updatedEstimate;
+         }
       }
 
       return applicationProperties;
    }
 
+   protected int unmarshalledApplicationPropertiesMemoryEstimateFromData(ReadableBuffer data) {
+      if (applicationProperties != null) {
+         // they have been unmarshalled, estimate memory usage based on their encoded size
+         if (remainingBodyPosition != VALUE_NOT_PRESENT) {
+            return remainingBodyPosition - applicationPropertiesPosition;
+         } else {
+            return data.capacity() - applicationPropertiesPosition;
+         }
+      }
+      return 0;
+   }
+
    @SuppressWarnings("unchecked")
    protected Map<String, Object> getApplicationPropertiesMap(boolean createIfAbsent) {
       ApplicationProperties appMap = lazyDecodeApplicationProperties();
@@ -1692,4 +1713,14 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
    protected SimpleString.StringSimpleStringPool getPropertyValuesPool() {
       return coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesStringSimpleStringPools().getPropertyValuesPool();
    }
+
+   @Override
+   public Object getOwner() {
+      return owner;
+   }
+
+   @Override
+   public void setOwner(Object object) {
+      this.owner = object;
+   }
 }
\ No newline at end of file
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java
index ce2e374..80358aa 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java
@@ -187,23 +187,12 @@ public class AMQPStandardMessage extends AMQPMessage {
    @Override
    public int getMemoryEstimate() {
       if (memoryEstimate == -1) {
-         memoryEstimate = memoryOffset + (data != null ? data.capacity() + unmarshalledApplicationPropertiesMemoryEstimateFromData() : 0);
+         memoryEstimate = memoryOffset + (data != null ? data.capacity() + unmarshalledApplicationPropertiesMemoryEstimateFromData(data) : 0);
       }
 
       return memoryEstimate;
    }
 
-   private int unmarshalledApplicationPropertiesMemoryEstimateFromData() {
-      if (applicationProperties != null) {
-         // they have been unmarshalled, estimate memory usage based on their encoded size
-         if (remainingBodyPosition != VALUE_NOT_PRESENT) {
-            return remainingBodyPosition - applicationPropertiesPosition;
-         } else {
-            return data.capacity() - applicationPropertiesPosition;
-         }
-      }
-      return 0;
-   }
 
    @Override
    public void persist(ActiveMQBuffer targetRecord) {
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
index bb44830..86dda6d 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
@@ -23,7 +23,6 @@ import java.util.Map;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.MessageReference;
@@ -153,15 +152,8 @@ public class AMQPMirrorControllerSource implements MirrorController, ActiveMQCom
 
       try {
          context.setReusable(false);
-         PagingStore storeOwner = null;
-         if (refs.size() > 0) {
-            storeOwner = refs.get(0).getOwner();
-         }
-         if (storeOwner != null && !storeOwner.getAddress().equals(message.getAddressSimpleString())) {
-            storeOwner = server.getPagingManager().getPageStore(message.getAddressSimpleString());
-         }
-         MessageReference ref = MessageReference.Factory.createReference(message, snfQueue, storeOwner);
 
+         MessageReference ref = MessageReference.Factory.createReference(message, snfQueue);
          snfQueue.refUp(ref);
 
          Map<Symbol, Object> daMap = new HashMap<>();
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
index 0279b5b..1d88a40 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
@@ -499,6 +499,15 @@ public class OpenwireMessage implements Message {
    }
 
    @Override
+   public Object getOwner() {
+      return null;
+   }
+
+   @Override
+   public void setOwner(Object object) {
+   }
+
+   @Override
    public int getUsage() {
       return 0;
    }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index cec65b2..5fb5a7d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -51,7 +51,6 @@ import org.apache.activemq.artemis.core.server.Consumer;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
 import org.apache.activemq.artemis.core.server.impl.RefsOperation;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@@ -793,9 +792,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
       int i = 0;
       for (MessageReference ref : refs) {
          Message message = ref.getMessage();
-         final int currentMemoryEstimate = message.getMemoryEstimate();
          messages[i++] = message.toMap();
-         MessageReferenceImpl.accountForChangeInMemoryEstimate(ref, currentMemoryEstimate);
       }
       return messages;
    }
@@ -856,9 +853,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
                   MessageReference ref = iterator.next();
                   if (filter == null || filter.match(ref.getMessage())) {
                      Message message = ref.getMessage();
-                     final int currentMemoryEstimate = message.getMemoryEstimate();
                      messages.add(message.toMap());
-                     MessageReferenceImpl.accountForChangeInMemoryEstimate(ref, currentMemoryEstimate);
                   }
                }
             } catch (NoSuchElementException ignored) {
@@ -903,9 +898,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
             if (iterator.hasNext()) {
                MessageReference ref = iterator.next();
                Message message = ref.getMessage();
-               final int currentMemoryEstimate = message.getMemoryEstimate();
                messages.add(message.toMap());
-               MessageReferenceImpl.accountForChangeInMemoryEstimate(ref, currentMemoryEstimate);
             }
             return messages.toArray(new Map[1]);
          }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
index 27e6167..76f5a05 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
@@ -23,7 +23,6 @@ import java.util.function.Consumer;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
-import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
@@ -407,16 +406,6 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
    }
 
    @Override
-   public PagingStore getOwner() {
-      return null;
-   }
-
-   @Override
-   public void setOwner(PagingStore owner) {
-
-   }
-
-   @Override
    public boolean isDurable() {
       if (durable == UNDEFINED_IS_DURABLE) {
          durable = getMessage().isDurable() ? IS_DURABLE : IS_NOT_DURABLE;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index e20673e..b027c27 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1244,7 +1244,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
    @Override
    public MessageReference reload(final Message message, final Queue queue, final Transaction tx) throws Exception {
 
-      MessageReference reference = MessageReference.Factory.createReference(message, queue, pagingManager.getPageStore(message.getAddressSimpleString()));
+      MessageReference reference = MessageReference.Factory.createReference(message, queue);
 
       Long scheduledDeliveryTime;
       if (message.hasScheduledDeliveryTime()) {
@@ -1499,6 +1499,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
       }
 
       PagingStore owningStore = pagingManager.getPageStore(message.getAddressSimpleString());
+      message.setOwner(owningStore);
       for (Map.Entry<SimpleString, RouteContextList> entry : context.getContexListing().entrySet()) {
          PagingStore store;
          if (entry.getKey() == message.getAddressSimpleString() || entry.getKey().equals(message.getAddressSimpleString())) {
@@ -1518,7 +1519,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
          }
 
          for (Queue queue : entry.getValue().getNonDurableQueues()) {
-            MessageReference reference = MessageReference.Factory.createReference(message, queue, owningStore);
+            MessageReference reference = MessageReference.Factory.createReference(message, queue);
 
             if (deliveryTime != null) {
                reference.setScheduledDeliveryTime(deliveryTime);
@@ -1533,7 +1534,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
          while (iter.hasNext()) {
             Queue queue = iter.next();
 
-            MessageReference reference = MessageReference.Factory.createReference(message, queue, owningStore);
+            MessageReference reference = MessageReference.Factory.createReference(message, queue);
 
             if (context.isAlreadyAcked(context.getAddress(message), queue)) {
                reference.setAlreadyAcked();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
index 6765e4f..4594fcd 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
@@ -22,7 +22,6 @@ import java.util.function.Consumer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.server.impl.AckReason;
 import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
 import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -35,8 +34,8 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
 public interface MessageReference {
 
    final class Factory {
-      public static MessageReference createReference(Message encode, final Queue queue, PagingStore pageStore) {
-         return new MessageReferenceImpl(encode, queue, pageStore);
+      public static MessageReference createReference(Message encode, final Queue queue) {
+         return new MessageReferenceImpl(encode, queue);
       }
    }
    boolean isPaged();
@@ -138,7 +137,4 @@ public interface MessageReference {
     */
    long getPersistentSize() throws ActiveMQException;
 
-   PagingStore getOwner();
-
-   void setOwner(PagingStore owner);
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/GroupFirstMessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/GroupFirstMessageReference.java
index 6e2f1fc..c0b0e02 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/GroupFirstMessageReference.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/GroupFirstMessageReference.java
@@ -218,13 +218,4 @@ public class GroupFirstMessageReference implements MessageReference {
       return messageReference.getPersistentSize();
    }
 
-   @Override
-   public PagingStore getOwner() {
-      return this.owner;
-   }
-
-   @Override
-   public void setOwner(PagingStore owner) {
-      this.owner = owner;
-   }
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index 02031e7..4305585 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -570,16 +570,6 @@ public class LastValueQueue extends QueueImpl {
          return new StringBuilder().append("HolderReference").append("@").append(Integer.toHexString(System.identityHashCode(this))).append("[ref=").append(ref).append("]").toString();
       }
 
-      @Override
-      public PagingStore getOwner() {
-         return ref.getOwner();
-      }
-
-      @Override
-      public void setOwner(PagingStore owner) {
-
-         ref.setOwner(owner);
-      }
    }
 
    @Override
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
index dfbeb87..4938a1b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
@@ -23,7 +23,6 @@ import java.util.function.Consumer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
@@ -36,7 +35,6 @@ import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
 public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceImpl> implements MessageReference, Runnable {
 
    private static final MessageReferenceComparatorByID idComparator = new MessageReferenceComparatorByID();
-   private volatile PagingStore owner;
 
    public static Comparator<MessageReference> getIDComparator() {
       return idComparator;
@@ -107,15 +105,13 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
 
       this.queue = queue;
 
-      this.owner = other.owner;
    }
 
-   public MessageReferenceImpl(final Message message, final Queue queue, final PagingStore owner) {
+   public MessageReferenceImpl(final Message message, final Queue queue) {
       this.message = message;
 
       this.queue = queue;
 
-      this.owner = owner;
    }
 
    // MessageReference implementation -------------------------------
@@ -179,15 +175,6 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
       return MessageReferenceImpl.memoryOffset;
    }
 
-   public static void accountForChangeInMemoryEstimate(final MessageReference ref, final int existingMemoryEstimate) {
-      final int delta = ref.getMessageMemoryEstimate() - existingMemoryEstimate;
-      if (delta > 0) {
-         PagingStore pageStore = ref.getOwner();
-         if (pageStore != null) {
-            pageStore.addSize(delta);
-         }
-      }
-   }
 
    @Override
    public int getDeliveryCount() {
@@ -367,13 +354,4 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
       return this.getMessage().getPersistentSize();
    }
 
-   @Override
-   public PagingStore getOwner() {
-      return this.owner;
-   }
-
-   @Override
-   public void setOwner(PagingStore owner) {
-      this.owner = owner;
-   }
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 61f1475..53eaafb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1005,8 +1005,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    public void refUp(MessageReference messageReference) {
       int count = messageReference.getMessage().refUp();
       if (count == 1) {
-         if (messageReference.getOwner() != null) {
-            messageReference.getOwner().addSize(messageReference.getMessageMemoryEstimate());
+         if (messageReference.getMessage().getOwner() != null) {
+            ((PagingStore)messageReference.getMessage().getOwner()).addSize(messageReference.getMessageMemoryEstimate());
          }
       }
       if (pagingStore != null) {
@@ -1018,8 +1018,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    public void refDown(MessageReference messageReference) {
       int count = messageReference.getMessage().refDown();
       if (count == 0) {
-         if (messageReference.getOwner() != null) {
-            messageReference.getOwner().addSize(-messageReference.getMessageMemoryEstimate());
+         if (messageReference.getMessage().getOwner() != null) {
+            ((PagingStore)messageReference.getMessage().getOwner()).addSize(-messageReference.getMessageMemoryEstimate());
          }
       }
       if (pagingStore != null) {
@@ -3071,9 +3071,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
             proceedDeliver(handledconsumer, ref);
          }
 
-         if (existingMemoryEstimate > 0 ) {
-            MessageReferenceImpl.accountForChangeInMemoryEstimate(ref, existingMemoryEstimate);
-         }
       }
 
       return true;
@@ -3697,13 +3694,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                consumer = groupConsumer;
             }
 
-            // filter evaluation may cause properties to be lazyDecoded
-            final int existingMemoryEstimate = ref.getMessageMemoryEstimate();
-
             HandleStatus status = handle(ref, consumer);
-
-            MessageReferenceImpl.accountForChangeInMemoryEstimate(ref, existingMemoryEstimate);
-
             if (status == HandleStatus.HANDLED) {
                final MessageReference reference;
                if (redistributor == null) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index f7dbf9a..10b131a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -644,7 +644,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
    public void forceDelivery(final long sequence)  {
       forceDelivery(sequence, () -> {
          Message forcedDeliveryMessage = new CoreMessage(storageManager.generateID(), 50);
-         MessageReference reference = MessageReference.Factory.createReference(forcedDeliveryMessage, messageQueue, null);
+         MessageReference reference = MessageReference.Factory.createReference(forcedDeliveryMessage, messageQueue);
          reference.setDeliveryCount(0);
 
          forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 0fcd2b4..3a150c7 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -251,7 +251,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
                            long nextMessageID,
                            long nextScheduledTime,
                            boolean tail) {
-      MessageReferenceImpl refImpl = new MessageReferenceImpl(new FakeMessage(nextMessageID), null, null);
+      MessageReferenceImpl refImpl = new MessageReferenceImpl(new FakeMessage(nextMessageID), null);
       refImpl.setScheduledDeliveryTime(nextScheduledTime);
       handler.addInPlace(nextScheduledTime, refImpl, tail);
    }
@@ -261,7 +261,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
                                  long nextScheduledTime,
                                  boolean tail,
                                  Queue queue) {
-      MessageReferenceImpl refImpl = new MessageReferenceImpl(new FakeMessage(nextMessageID), queue, null);
+      MessageReferenceImpl refImpl = new MessageReferenceImpl(new FakeMessage(nextMessageID), queue);
       refImpl.setScheduledDeliveryTime(nextScheduledTime);
       handler.checkAndSchedule(refImpl, tail);
    }
@@ -810,6 +810,15 @@ public class ScheduledDeliveryHandlerTest extends Assert {
          return 0;
       }
 
+      @Override
+      public Object getOwner() {
+         return null;
+      }
+
+      @Override
+      public void setOwner(Object object) {
+      }
+
    }
 
    public class FakeQueueForScheduleUnitTest extends CriticalComponentImpl implements Queue {
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index 85dd47f..adfceee 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -2150,7 +2150,7 @@ public abstract class ActiveMQTestBase extends Assert {
    protected MessageReference generateReference(final Queue queue, final long id) {
       Message message = generateMessage(id);
 
-      return MessageReference.Factory.createReference(message, queue, null);
+      return MessageReference.Factory.createReference(message, queue);
    }
 
    protected int calculateRecordSize(final int size, final int alignment) {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
index 310d03d..6b6eb10 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
@@ -839,5 +839,14 @@ public class AcknowledgeTest extends ActiveMQTestBase {
       public long getPersistentSize() throws ActiveMQException {
          return 0;
       }
+
+      @Override
+      public Object getOwner() {
+         return null;
+      }
+
+      @Override
+      public void setOwner(Object object) {
+      }
    }
 }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java
index 97c941e..3dee2b2 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java
@@ -334,7 +334,7 @@ public class GlobalPagingTest extends PagingTest {
                int id = 1000;
                try (ClientConsumer consumer = session.createConsumer(replyQueue)) {
                   final Queue queue = server.locateQueue(replyQueue);
-                  final MessageReference reference = MessageReference.Factory.createReference(session.createMessage(false), queue, null);
+                  final MessageReference reference = MessageReference.Factory.createReference(session.createMessage(false), queue);
                   reference.getMessage().setMessageID(id++);
                   //it will cause QueueImpl::directDeliver -> false
                   queue.addHead(reference, false);
diff --git a/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueConcurrentTest.java b/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueConcurrentTest.java
index 3cf0cae..6d73cfd 100644
--- a/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueConcurrentTest.java
+++ b/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueConcurrentTest.java
@@ -140,7 +140,7 @@ public class QueueConcurrentTest extends ActiveMQTestBase {
          while (System.currentTimeMillis() - start < testTime) {
             Message message = generateMessage(i);
 
-            MessageReference ref = MessageReference.Factory.createReference(message, queue, null);
+            MessageReference ref = MessageReference.Factory.createReference(message, queue);
 
             queue.addTail(ref, false);