You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/07/13 04:01:23 UTC

[activemq-artemis] branch main updated: ARTEMIS-3890 - rework LVQ implementation to ensure all messages get delivered, replacement of lvq now tied to the deliver loop. Fix issue with duplicates - bug in LinkedListImpl`

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 4a4765c39c ARTEMIS-3890 - rework LVQ implementation to ensure all messages get delivered, replacement of lvq now tied to the deliver loop. Fix issue with duplicates - bug in LinkedListImpl`
4a4765c39c is described below

commit 4a4765c39cb73438ea2199b6e0937566d3556c10
Author: Gary Tully <ga...@gmail.com>
AuthorDate: Tue Jun 7 10:49:36 2022 -0500

    ARTEMIS-3890 - rework LVQ implementation to ensure all messages get delivered, replacement of lvq now tied to the deliver loop. Fix issue with duplicates - bug in LinkedListImpl`
---
 .../artemis/utils/collections/LinkedListImpl.java  |   4 +
 .../utils/collections/PriorityLinkedListImpl.java  |   2 +-
 .../artemis/core/server/impl/LastValueQueue.java   | 437 +++------------------
 .../core/server/impl/QueueConsumersImpl.java       |   2 +-
 .../artemis/core/server/impl/QueueImpl.java        |  72 ++--
 .../artemis/tests/integration/amqp/JMSLVQTest.java |  32 ++
 .../integration/amqp/JMSNonDestructiveTest.java    |   3 +
 .../tests/integration/jms/client/LVQTest.java      |   4 +-
 .../tests/integration/server/LVQRecoveryTest.java  |   2 +-
 .../artemis/tests/integration/server/LVQTest.java  |  29 +-
 .../artemis/tests/unit/util/LinkedListTest.java    |  35 ++
 11 files changed, 183 insertions(+), 439 deletions(-)

diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
index 9f12b47246..4f3b8e9dc9 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
@@ -538,6 +538,10 @@ public class LinkedListImpl<E> implements LinkedList<E> {
                   current = current.prev;
 
                   current.iterCount++;
+
+                  if (last == node) {
+                     last = current;
+                  }
                } else {
                   current = null;
                }
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedListImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedListImpl.java
index f1e50d3261..4d01441c9b 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedListImpl.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedListImpl.java
@@ -30,7 +30,7 @@ public class PriorityLinkedListImpl<E> implements PriorityLinkedList<E> {
 
    private static final AtomicIntegerFieldUpdater<PriorityLinkedListImpl> SIZE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PriorityLinkedListImpl.class, "size");
 
-   protected LinkedListImpl<E>[] levels;
+   protected final LinkedListImpl<E>[] levels;
 
    private volatile int size;
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index f01403fb90..50dc850c09 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -17,16 +17,11 @@
 package org.apache.activemq.artemis.core.server.impl;
 
 import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.function.Consumer;
 
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -38,13 +33,13 @@ import org.apache.activemq.artemis.core.postoffice.PostOffice;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.MessageReference;
-import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueFactory;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
 import org.jboss.logging.Logger;
 
 /**
@@ -59,25 +54,9 @@ import org.jboss.logging.Logger;
 public class LastValueQueue extends QueueImpl {
 
    private static final Logger logger = Logger.getLogger(LastValueQueue.class);
-   private final Map<SimpleString, HolderReference> map = new ConcurrentHashMap<>();
+   private final Map<SimpleString, MessageReference> map = new HashMap<>();
    private final SimpleString lastValueKey;
 
-   // only use this within synchronized methods or synchronized(this) blocks
-   protected final LinkedList<MessageReference> nextDeliveries = new LinkedList<>();
-
-
-   /* in certain cases we need to redeliver a message */
-   @Override
-   protected MessageReference nextDelivery() {
-      return nextDeliveries.poll();
-   }
-
-   @Override
-   protected void repeatNextDelivery(MessageReference reference) {
-      // put the ref back onto the head of the list so that the next time poll() is called this ref is returned
-      nextDeliveries.addFirst(reference);
-   }
-
 
    @Deprecated
    public LastValueQueue(final long persistenceID,
@@ -162,116 +141,49 @@ public class LastValueQueue extends QueueImpl {
 
    @Override
    public synchronized void addTail(final MessageReference ref, final boolean direct) {
-      if (scheduleIfPossible(ref)) {
-         return;
-      }
-      final SimpleString prop = ref.getLastValueProperty();
-
-      if (prop != null) {
-         HolderReference hr = map.get(prop);
-
-         if (hr != null) {
-            if (isNonDestructive() && hr.isInDelivery()) {
-               // if the ref is already being delivered we'll do the replace in the postAcknowledge
-               hr.setReplacementRef(ref);
-            } else {
-               // We need to overwrite the old ref with the new one and ack the old one
-               replaceLVQMessage(ref, hr);
-
-               if (isNonDestructive() && hr.isDelivered()) {
-                  hr.resetDelivered();
-                  // since we're replacing a ref that was already delivered we want to trigger a delivery for this new replacement
-                  nextDeliveries.add(hr);
-                  deliverAsync();
-               }
-            }
-         } else {
-            hr = new HolderReference(prop, ref);
-
-            map.put(prop, hr);
-
-            super.addTail(hr, isNonDestructive() ? false : direct);
-         }
-      } else {
+      if (!scheduleIfPossible(ref)) {
+         trackLastValue(ref);
          super.addTail(ref, isNonDestructive() ? false : direct);
       }
    }
 
-
    @Override
-   public long getMessageCount() {
-      if (pageSubscription != null) {
-         // messageReferences will have depaged messages which we need to discount from the counter as they are
-         // counted on the pageSubscription as well
-         return (long) pendingMetrics.getMessageCount() + getScheduledCount() + pageSubscription.getMessageCount();
-      } else {
-         return (long) pendingMetrics.getMessageCount() + getScheduledCount();
-      }
+   public void addHead(final MessageReference ref, boolean scheduling) {
+      if (scheduling) {
+         // track last value when scheduled message is actually enqueued
+         trackLastValue(ref);
+      } else if (isNonDestructive() == false) {
+         // for released messages from a consumer or tx that have been destroyed,
+         // use as a last value in the absence of any newer value, it may be stale
+         trackLastValueIfAbsent(ref);
+      }
+      super.addHead(ref, scheduling);
    }
 
-   /** LVQ has to use regular addHead due to last value queues calculations */
    @Override
-   public void addSorted(MessageReference ref, boolean scheduling) {
-      this.addHead(ref, scheduling);
+   public void addSorted(final MessageReference ref, boolean scheduling) {
+      addHead(ref, scheduling);
    }
 
-   /** LVQ has to use regular addHead due to last value queues calculations */
-   @Override
-   public void addSorted(List<MessageReference> refs, boolean scheduling) {
-      this.addHead(refs, scheduling);
-   }
-
-   @Override
-   public synchronized void addHead(final MessageReference ref, boolean scheduling) {
-      // we first need to check redelivery-delay, as we can't put anything on headers if redelivery-delay
-      if (!scheduling && scheduledDeliveryHandler.checkAndSchedule(ref, false)) {
-         return;
+   private void trackLastValue(MessageReference ref) {
+      final SimpleString lastValueProperty = ref.getLastValueProperty();
+      if (lastValueProperty != null) {
+         map.put(lastValueProperty, ref);
       }
+   }
 
-      SimpleString lastValueProp = ref.getLastValueProperty();
-
-      if (lastValueProp != null) {
-         HolderReference hr = map.get(lastValueProp);
-
-         if (hr != null) {
-            if (scheduling) {
-               // We need to overwrite the old ref with the new one and ack the old one
-
-               replaceLVQMessage(ref, hr);
-            } else {
-               // We keep the current ref and ack the one we are returning
-
-               super.referenceHandled(ref);
-
-               try {
-                  super.acknowledge(ref);
-               } catch (Exception e) {
-                  ActiveMQServerLogger.LOGGER.errorAckingOldReference(e);
-               }
-            }
-         } else {
-            hr = new HolderReference(lastValueProp, ref);
-
-            map.put(lastValueProp, hr);
-
-            super.addHead(hr, scheduling);
-         }
-      } else {
-         super.addHead(ref, scheduling);
+   private void trackLastValueIfAbsent(MessageReference ref) {
+      final SimpleString lastValueProperty = ref.getLastValueProperty();
+      if (lastValueProperty != null) {
+         map.putIfAbsent(lastValueProperty, ref);
       }
    }
 
    @Override
-   public void postAcknowledge(final MessageReference ref, AckReason reason) {
-      if (isNonDestructive()) {
-         if (ref instanceof HolderReference) {
-            HolderReference hr = (HolderReference) ref;
-            if (hr.getReplacementRef() != null) {
-               replaceLVQMessage(hr.getReplacementRef(), hr);
-            }
-         }
-      }
-      super.postAcknowledge(ref, reason);
+   public long getMessageCount() {
+      // with LV - delivered messages can remain on the queue so the delivering count
+      // count must be discounted else we are accounting the same message more than once
+      return super.getMessageCount() - getDeliveringCount();
    }
 
    @Override
@@ -284,21 +196,36 @@ public class LastValueQueue extends QueueImpl {
       return super.getQueueConfiguration().setLastValue(true).setLastValueKey(lastValueKey);
    }
 
-   private void replaceLVQMessage(MessageReference ref, HolderReference hr) {
-      MessageReference oldRef = hr.getReference();
-
-      referenceHandled(oldRef);
-      super.refRemoved(oldRef);
-
-      try {
-         oldRef.acknowledge(null, AckReason.REPLACED, null);
-      } catch (Exception e) {
-         ActiveMQServerLogger.LOGGER.errorAckingOldReference(e);
+   @Override
+   protected void pruneLastValues() {
+      // called with synchronized(this) from super.deliver()
+      try (LinkedListIterator<MessageReference> iter = messageReferences.iterator()) {
+         while (iter.hasNext()) {
+            MessageReference ref = iter.next();
+            if (!currentLastValue(ref)) {
+               iter.remove();
+               try {
+                  referenceHandled(ref);
+                  super.refRemoved(ref);
+                  ref.acknowledge(null, AckReason.REPLACED, null);
+               } catch (Exception e) {
+                  ActiveMQServerLogger.LOGGER.errorAckingOldReference(e);
+               }
+            }
+         }
       }
+   }
 
-      hr.setReference(ref);
-      addRefSize(ref);
-      refAdded(ref);
+   private boolean currentLastValue(final MessageReference ref) {
+      boolean currentLastValue = false;
+      SimpleString lastValueProp = ref.getLastValueProperty();
+      if (lastValueProp != null) {
+         MessageReference current = map.get(lastValueProp);
+         if (current == ref) {
+            currentLastValue = true;
+         }
+      }
+      return currentLastValue;
    }
 
    @Override
@@ -327,16 +254,9 @@ public class LastValueQueue extends QueueImpl {
    }
 
    @Override
-   public synchronized void reload(final MessageReference ref) {
-      // repopulate LVQ map & reload proper HolderReferences
-      SimpleString lastValueProp = ref.getLastValueProperty();
-      if (lastValueProp != null) {
-         HolderReference hr = new HolderReference(lastValueProp, ref);
-         map.put(lastValueProp, hr);
-         super.reload(hr);
-      } else {
-         super.reload(ref);
-      }
+   public synchronized void reload(final MessageReference newRef) {
+      trackLastValue(newRef);
+      super.reload(newRef);
    }
 
    private synchronized void removeIfCurrent(MessageReference ref) {
@@ -361,9 +281,6 @@ public class LastValueQueue extends QueueImpl {
       };
    }
 
-
-
-
    @Override
    public boolean isLastValue() {
       return true;
@@ -378,238 +295,6 @@ public class LastValueQueue extends QueueImpl {
       return Collections.unmodifiableSet(map.keySet());
    }
 
-   private static class HolderReference implements MessageReference {
-
-      private final SimpleString prop;
-
-      private volatile boolean delivered = false;
-
-      private volatile MessageReference ref;
-
-      private volatile MessageReference replacementRef;
-
-      private long consumerID;
-
-      private boolean hasConsumerID = false;
-
-
-      public MessageReference getReplacementRef() {
-         return replacementRef;
-      }
-
-      public void setReplacementRef(MessageReference replacementRef) {
-         this.replacementRef = replacementRef;
-      }
-
-      public void resetDelivered() {
-         delivered = false;
-      }
-
-      public boolean isDelivered() {
-         return delivered;
-      }
-
-      HolderReference(final SimpleString prop, final MessageReference ref) {
-         this.prop = prop;
-
-         this.ref = ref;
-      }
-
-      @Override
-      public void onDelivery(Consumer<? super MessageReference> callback) {
-         // HolderReference may be reused among different consumers, so we don't set a callback and won't support Runnables
-      }
-
-      MessageReference getReference() {
-         return ref;
-      }
-
-      @Override
-      public void handled() {
-         delivered = true;
-         // We need to remove the entry from the map just before it gets delivered
-         ref.handled();
-         if (!ref.getQueue().isNonDestructive()) {
-            ((LastValueQueue) ref.getQueue()).removeIfCurrent(this);
-         }
-      }
-
-      @Override
-      public void setInDelivery(boolean inDelivery) {
-         ref.setInDelivery(inDelivery);
-      }
-
-      @Override
-      public boolean isInDelivery() {
-         return ref.isInDelivery();
-      }
-
-      @Override
-      public Object getProtocolData() {
-         return ref.getProtocolData();
-      }
-
-      @Override
-      public void setProtocolData(Object data) {
-         ref.setProtocolData(data);
-      }
-
-      @Override
-      public void setAlreadyAcked() {
-         ref.setAlreadyAcked();
-      }
-
-      @Override
-      public boolean isAlreadyAcked() {
-         return ref.isAlreadyAcked();
-      }
-
-      void setReference(final MessageReference ref) {
-         this.ref = ref;
-      }
-
-      @Override
-      public MessageReference copy(final Queue queue) {
-         return ref.copy(queue);
-      }
-
-      @Override
-      public void decrementDeliveryCount() {
-         ref.decrementDeliveryCount();
-      }
-
-      @Override
-      public int getDeliveryCount() {
-         return ref.getDeliveryCount();
-      }
-
-      @Override
-      public Message getMessage() {
-         return ref.getMessage();
-      }
-
-      @Override
-      public long getMessageID() {
-         return ref.getMessageID();
-      }
-
-      @Override
-      public boolean isDurable() {
-         return getMessage().isDurable();
-      }
-
-      @Override
-      public SimpleString getLastValueProperty() {
-         return prop;
-      }
-
-      @Override
-      public Queue getQueue() {
-         return ref.getQueue();
-      }
-
-      @Override
-      public long getScheduledDeliveryTime() {
-         return ref.getScheduledDeliveryTime();
-      }
-
-      @Override
-      public void incrementDeliveryCount() {
-         ref.incrementDeliveryCount();
-      }
-
-      @Override
-      public void setDeliveryCount(final int deliveryCount) {
-         ref.setDeliveryCount(deliveryCount);
-      }
-
-      @Override
-      public void setScheduledDeliveryTime(final long scheduledDeliveryTime) {
-         ref.setScheduledDeliveryTime(scheduledDeliveryTime);
-      }
-
-      @Override
-      public void acknowledge(Transaction tx) throws Exception {
-         ref.acknowledge(tx);
-      }
-
-      @Override
-      public void acknowledge(Transaction tx, ServerConsumer consumer) throws Exception {
-         ref.acknowledge(tx, consumer);
-      }
-
-      @Override
-      public void acknowledge(Transaction tx, AckReason reason, ServerConsumer consumer) throws Exception {
-         ref.acknowledge(tx, reason, consumer);
-      }
-
-      @Override
-      public void setPersistedCount(int count) {
-         ref.setPersistedCount(count);
-      }
-
-      @Override
-      public int getPersistedCount() {
-         return ref.getPersistedCount();
-      }
-
-      @Override
-      public boolean isPaged() {
-         return false;
-      }
-
-      /* (non-Javadoc)
-       * @see org.apache.activemq.artemis.core.server.MessageReference#acknowledge(org.apache.activemq.artemis.core.server.MessageReference)
-       */
-      @Override
-      public void acknowledge() throws Exception {
-         ref.getQueue().acknowledge(this);
-      }
-
-      /* (non-Javadoc)
-       * @see org.apache.activemq.artemis.core.server.MessageReference#getMessageMemoryEstimate()
-       */
-      @Override
-      public int getMessageMemoryEstimate() {
-         return ref.getMessage().getMemoryEstimate();
-      }
-
-      @Override
-      public void emptyConsumerID() {
-         this.hasConsumerID = false;
-      }
-
-      @Override
-      public void setConsumerId(long consumerID) {
-         this.hasConsumerID = true;
-         this.consumerID = consumerID;
-      }
-
-      @Override
-      public boolean hasConsumerId() {
-         return hasConsumerID;
-      }
-
-      @Override
-      public long getConsumerId() {
-         if (!this.hasConsumerID) {
-            throw new IllegalStateException("consumerID isn't specified: please check hasConsumerId first");
-         }
-         return this.consumerID;
-      }
-
-      @Override
-      public long getPersistentSize() throws ActiveMQException {
-         return ref.getPersistentSize();
-      }
-
-      @Override
-      public String toString() {
-         return new StringBuilder().append("HolderReference").append("@").append(Integer.toHexString(System.identityHashCode(this))).append("[ref=").append(ref).append("]").toString();
-      }
-
-   }
-
    @Override
    public int hashCode() {
       final int prime = 31;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
index 1afca46a6a..e3964a1d22 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
@@ -49,7 +49,7 @@ public class QueueConsumersImpl<T extends PriorityAware> implements QueueConsume
 
    private final PriorityCollection<T> consumers = new PriorityCollection<>(CopyOnWriteArraySet::new);
    private final Collection<T> unmodifiableConsumers = Collections.unmodifiableCollection(consumers);
-   private UpdatableIterator<T> iterator = new UpdatableIterator<>(consumers.resettableIterator());
+   private final UpdatableIterator<T> iterator = new UpdatableIterator<>(consumers.resettableIterator());
 
    //-- START :: ResettableIterator Methods
    // As any iterator, these are not thread-safe and should ONLY be called by a single thread at a time.
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 6987bcf4a3..8afa15f363 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -194,7 +194,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    private final MpscUnboundedArrayQueue<MessageReference> intermediateMessageReferences = new MpscUnboundedArrayQueue<>(8192);
 
    // This is where messages are stored
-   private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<>(QueueImpl.NUM_PRIORITIES, MessageReferenceImpl.getIDComparator());
+   protected final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<>(QueueImpl.NUM_PRIORITIES, MessageReferenceImpl.getIDComparator());
 
    private NodeStore<MessageReference> nodeStore;
 
@@ -340,16 +340,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    private volatile long ringSize;
 
-   /* in certain cases we need to redeliver a message directly.
-   * it's useful for usecases last LastValueQueue */
-   protected MessageReference nextDelivery() {
-      return null;
-   }
-
-   protected void repeatNextDelivery(MessageReference reference) {
-
-   }
-
    @Override
    public boolean isSwept() {
       return swept;
@@ -2895,7 +2885,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       }
    }
 
-   private synchronized void doInternalPoll() {
+   synchronized void doInternalPoll() {
 
       int added = 0;
       MessageReference ref;
@@ -2974,27 +2964,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       long timeout = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(DELIVERY_TIMEOUT);
       consumers.reset();
       while (true) {
-         if (handled == MAX_DELIVERIES_IN_LOOP) {
-            // Schedule another one - we do this to prevent a single thread getting caught up in this loop for too
-            // long
-
-            deliverAsync(true);
-
-            return false;
-         }
-
-         if (System.nanoTime() - timeout > 0) {
-            if (logger.isTraceEnabled()) {
-               logger.trace("delivery has been running for too long. Scheduling another delivery task now");
-            }
-
+         if (handled == MAX_DELIVERIES_IN_LOOP || System.nanoTime() - timeout > 0) {
+            // Schedule another one - we do this to prevent a single thread getting caught up in this loop for too long
             deliverAsync(true);
-
             return false;
          }
 
          MessageReference ref;
-
          Consumer handledconsumer = null;
 
          synchronized (this) {
@@ -3024,6 +3000,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
             if (consumers.hasNext()) {
                holder = consumers.next();
             } else {
+               pruneLastValues();
                break;
             }
 
@@ -3034,15 +3011,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                holder.iter = messageReferences.iterator();
             }
 
-            // LVQ support
-            ref = nextDelivery();
-            boolean nextDelivery = false;
-            if (ref != null) {
-               nextDelivery = true;
-            }
-
-            if (ref == null && holder.iter.hasNext()) {
+            if (holder.iter.hasNext()) {
                ref = holder.iter.next();
+            } else {
+               ref = null;
             }
 
             if (ref == null) {
@@ -3092,18 +3064,14 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                   handled++;
                   consumers.reset();
                } else if (status == HandleStatus.BUSY) {
-                  if (nextDelivery) {
-                     repeatNextDelivery(ref);
-                  } else {
-                     try {
-                        holder.iter.repeat();
-                     } catch (NoSuchElementException e) {
-                        // this could happen if there was an exception on the queue handling
-                        // and it returned BUSY because of that exception
-                        //
-                        // We will just log it as there's nothing else we can do now.
-                        logger.warn(e.getMessage(), e);
-                     }
+                  try {
+                     holder.iter.repeat();
+                  } catch (NoSuchElementException e) {
+                     // this could happen if there was an exception on the queue handling
+                     // and it returned BUSY because of that exception
+                     //
+                     // We will just log it as there's nothing else we can do now.
+                     logger.warn(e.getMessage(), e);
                   }
 
                   noDelivery++;
@@ -3130,6 +3098,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                // Round robin'd all
 
                if (noDelivery == this.consumers.size()) {
+                  pruneLastValues();
+
                   if (handledconsumer != null) {
                      // this shouldn't really happen,
                      // however I'm keeping this as an assertion case future developers ever change the logic here on this class
@@ -3144,6 +3114,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
                noDelivery = 0;
             }
+
          }
 
          if (handledconsumer != null) {
@@ -3154,6 +3125,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       return true;
    }
 
+   // called with 'this' locked
+   protected void pruneLastValues() {
+      // interception point for LVQ
+   }
+
    protected void removeMessageReference(ConsumerHolder<? extends Consumer> holder, MessageReference ref) {
       holder.iter.remove();
       refRemoved(ref);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSLVQTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSLVQTest.java
index 844e735bac..8b0e9732ff 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSLVQTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSLVQTest.java
@@ -31,6 +31,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
 import org.junit.Test;
 
 public class JMSLVQTest extends JMSClientTestSupport {
@@ -178,4 +179,35 @@ public class JMSLVQTest extends JMSClientTestSupport {
          p.send(queue1, message2);
       }
    }
+
+   @Test
+   public void testNonDestructiveWithSelector() throws Exception {
+      final String MY_QUEUE = RandomUtil.randomString();
+      final boolean NON_DESTRUCTIVE = true;
+      server.createQueue(new QueueConfiguration(MY_QUEUE).setRoutingType(RoutingType.ANYCAST).setNonDestructive(NON_DESTRUCTIVE).setLastValue(true));
+
+      ConnectionSupplier connectionSupplier = CoreConnection;
+
+      Connection consumerConnection1 = connectionSupplier.createConnection();
+      Session consumerSession1 = consumerConnection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      Queue myQueue = consumerSession1.createQueue(MY_QUEUE);
+      MessageConsumer consumer1 = consumerSession1.createConsumer(myQueue);
+      consumerConnection1.start();
+
+      Connection consumerConnection2 = connectionSupplier.createConnection();
+      Session consumerSession2 = consumerConnection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      myQueue = consumerSession2.createQueue(MY_QUEUE);
+      MessageConsumer consumer2 = consumerSession2.createConsumer(myQueue, "foo='bar'");
+
+      Connection producerConnection = connectionSupplier.createConnection();
+      Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageProducer p = producerSession.createProducer(myQueue);
+
+      for (int i = 0; i < 1000; i++) {
+         TextMessage m = producerSession.createTextMessage();
+         m.setStringProperty(Message.HDR_LAST_VALUE_NAME.toString(), "abc");
+         p.send(m);
+         assertNotNull(consumer1.receive(500));
+      }
+   }
 }
\ No newline at end of file
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
index 609d7121a0..143cce2a23 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
@@ -620,6 +620,7 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
 
       HashMap<String, Integer> dups = new HashMap<>();
       List<Producer> producers = new ArrayList<>();
+      int receivedTally = 0;
 
       try (Connection connection = connectionSupplier.createConnection()) {
          Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
@@ -641,6 +642,7 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
             if (tm == null) {
                break;
             }
+            receivedTally++;
             results.get(tm.getStringProperty("lastval")).add(tm.getText());
             tm.acknowledge();
          }
@@ -669,6 +671,7 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
          Assert.fail("Duplicate messages received " + sb);
       }
 
+      Assert.assertEquals("Got all messages produced", MESSAGE_COUNT_PER_GROUP * GROUP_COUNT * PRODUCER_COUNT, receivedTally);
       Wait.assertEquals((long) GROUP_COUNT, () -> server.locateQueue(NON_DESTRUCTIVE_LVQ_QUEUE_NAME).getMessageCount(), 2000, 100, false);
    }
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/LVQTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/LVQTest.java
index f0f6900486..ba1d22d38a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/LVQTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/LVQTest.java
@@ -87,9 +87,9 @@ public class LVQTest extends JMSTestBase {
          assertNotNull(tm);
          assertEquals("Message 2", tm.getText());
 
-         // It is important to query here
-         // as we shouldn't rely on addHead after the consumer is closed
          org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue("random");
+         // one message on the queue and one in delivery - the same message if it's an LVQ
+         // LVQ getMessageCount will discount!
          Wait.assertEquals(1, serverQueue::getMessageCount);
       }
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQRecoveryTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQRecoveryTest.java
index 46e27ad9a2..015f34fbb4 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQRecoveryTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQRecoveryTest.java
@@ -157,7 +157,7 @@ public class LVQRecoveryTest extends ActiveMQTestBase {
       m = consumer.receive(1000);
       Assert.assertNotNull(m);
       m.acknowledge();
-      Assert.assertEquals(m.getBodyBuffer().readString(), "m6");
+      Assert.assertEquals("m6", m.getBodyBuffer().readString());
       m = consumer.receiveImmediate();
       Assert.assertNull(m);
    }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java
index b040f6bc7c..763f6ee3cd 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java
@@ -86,7 +86,7 @@ public class LVQTest extends ActiveMQTestBase {
       ClientMessage m2 = createTextMessage(clientSession, "m2");
       m2.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
       producer.send(m2);
-      assertEquals(1, server.locateQueue(qName1).getMessageCount());
+      Wait.assertEquals(1, () -> server.locateQueue(qName1).getMessageCount());
       clientSession.close();
 
       server.stop();
@@ -100,7 +100,8 @@ public class LVQTest extends ActiveMQTestBase {
       ClientMessage m3 = createTextMessage(clientSession, "m3");
       m3.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
       producer.send(m3);
-      assertEquals(1, server.locateQueue(qName1).getMessageCount());
+      // wait b/c prune takes a deliver attempt which is async
+      Wait.assertEquals(1, () -> server.locateQueue(qName1).getMessageCount());
 
       ClientConsumer consumer = clientSession.createConsumer(qName1);
       clientSession.start();
@@ -293,7 +294,6 @@ public class LVQTest extends ActiveMQTestBase {
    @Test
    public void testMultipleMessagesInTx() throws Exception {
       ClientProducer producer = clientSessionTxReceives.createProducer(address);
-      ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1);
       SimpleString messageId1 = new SimpleString("SMID1");
       SimpleString messageId2 = new SimpleString("SMID2");
       ClientMessage m1 = createTextMessage(clientSession, "m1");
@@ -308,6 +308,7 @@ public class LVQTest extends ActiveMQTestBase {
       producer.send(m2);
       producer.send(m3);
       producer.send(m4);
+      ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1);
       clientSessionTxReceives.start();
       ClientMessage m = consumer.receive(1000);
       Assert.assertNotNull(m);
@@ -392,6 +393,7 @@ public class LVQTest extends ActiveMQTestBase {
    public void testMultipleMessagesInTxSend() throws Exception {
       ClientProducer producer = clientSessionTxSends.createProducer(address);
       ClientConsumer consumer = clientSessionTxSends.createConsumer(qName1);
+      clientSessionTxSends.start();
       SimpleString rh = new SimpleString("SMID1");
       ClientMessage m1 = createTextMessage(clientSession, "m1");
       m1.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
@@ -412,11 +414,18 @@ public class LVQTest extends ActiveMQTestBase {
       producer.send(m5);
       producer.send(m6);
       clientSessionTxSends.commit();
-      clientSessionTxSends.start();
+      for (int i = 1; i < 6; i++) {
+         ClientMessage m = consumer.receive(1000);
+         Assert.assertNotNull(m);
+         m.acknowledge();
+         Assert.assertEquals("m" + i, m.getBodyBuffer().readString());
+      }
+      consumer.close();
+      consumer = clientSessionTxSends.createConsumer(qName1);
       ClientMessage m = consumer.receive(1000);
       Assert.assertNotNull(m);
       m.acknowledge();
-      Assert.assertEquals(m.getBodyBuffer().readString(), "m6");
+      Assert.assertEquals("m6", m.getBodyBuffer().readString());
    }
 
    @Test
@@ -460,7 +469,6 @@ public class LVQTest extends ActiveMQTestBase {
    @Test
    public void testMultipleMessagesPersistedCorrectlyInTx() throws Exception {
       ClientProducer producer = clientSessionTxSends.createProducer(address);
-      ClientConsumer consumer = clientSessionTxSends.createConsumer(qName1);
       SimpleString rh = new SimpleString("SMID1");
       ClientMessage m1 = createTextMessage(clientSession, "m1");
       m1.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
@@ -488,6 +496,7 @@ public class LVQTest extends ActiveMQTestBase {
       producer.send(m6);
       clientSessionTxSends.commit();
       clientSessionTxSends.start();
+      ClientConsumer consumer = clientSessionTxSends.createConsumer(qName1);
       ClientMessage m = consumer.receive(1000);
       Assert.assertNotNull(m);
       m.acknowledge();
@@ -705,7 +714,6 @@ public class LVQTest extends ActiveMQTestBase {
    @Test
    public void testLargeMessage() throws Exception {
       ClientProducer producer = clientSessionTxReceives.createProducer(address);
-      ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1);
       SimpleString rh = new SimpleString("SMID1");
 
       for (int i = 0; i < 50; i++) {
@@ -715,6 +723,7 @@ public class LVQTest extends ActiveMQTestBase {
          producer.send(message);
          clientSession.commit();
       }
+      ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1);
       clientSessionTxReceives.start();
       ClientMessage m = consumer.receive(1000);
       Assert.assertNotNull(m);
@@ -736,11 +745,11 @@ public class LVQTest extends ActiveMQTestBase {
 
       Queue queue = server.locateQueue(qName1);
       producer.send(m1);
-      long oldSize = queue.getPersistentSize();
+      Wait.assertEquals(123, () -> queue.getPersistentSize());
       producer.send(m2);
+      // encoded size is a little larger than payload
+      Wait.assertTrue(() -> queue.getPersistentSize() > 10 * 1024);
       assertEquals(queue.getDeliveringSize(), 0);
-      assertNotEquals(queue.getPersistentSize(), oldSize);
-      assertTrue(queue.getPersistentSize() > 10 * 1024);
    }
 
    @Test
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java
index 373619139c..47f3761f52 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java
@@ -1032,6 +1032,41 @@ public class LinkedListTest extends ActiveMQTestBase {
 
    }
 
+   @Test
+   public void testRemoveLastNudgeNoReplay() {
+      for (int i = 1; i < 3; i++) {
+         doTestRemoveLastNudgeNoReplay(i);
+      }
+   }
+
+   private void doTestRemoveLastNudgeNoReplay(int num) {
+
+      LinkedListIterator<Integer> iter = list.iterator();
+
+      for (int i = 0; i < num; i++) {
+         list.addTail(i);
+      }
+
+      // exhaust iterator
+      for (int i = 0; i < num; i++) {
+         assertTrue(iter.hasNext());
+         assertEquals(i, iter.next().intValue());
+      }
+
+      // remove last
+      LinkedListIterator<Integer> pruneIterator = list.iterator();
+      while (pruneIterator.hasNext()) {
+         int v = pruneIterator.next();
+         if (v == num - 1) {
+            pruneIterator.remove();
+         }
+      }
+
+      // ensure existing iterator does not reset or replay
+      assertFalse(iter.hasNext());
+      assertEquals(num - 1, list.size());
+   }
+
    @Test
    public void testGCNepotismPoll() {
       final int count = 100;