You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/07/13 04:01:23 UTC
[activemq-artemis] branch main updated: ARTEMIS-3890 - rework LVQ implementation to ensure all messages get delivered, replacement of lvq now tied to the deliver loop. Fix issue with duplicates - bug in LinkedListImpl`
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 4a4765c39c ARTEMIS-3890 - rework LVQ implementation to ensure all messages get delivered, replacement of lvq now tied to the deliver loop. Fix issue with duplicates - bug in LinkedListImpl`
4a4765c39c is described below
commit 4a4765c39cb73438ea2199b6e0937566d3556c10
Author: Gary Tully <ga...@gmail.com>
AuthorDate: Tue Jun 7 10:49:36 2022 -0500
ARTEMIS-3890 - rework LVQ implementation to ensure all messages get delivered, replacement of lvq now tied to the deliver loop. Fix issue with duplicates - bug in LinkedListImpl`
---
.../artemis/utils/collections/LinkedListImpl.java | 4 +
.../utils/collections/PriorityLinkedListImpl.java | 2 +-
.../artemis/core/server/impl/LastValueQueue.java | 437 +++------------------
.../core/server/impl/QueueConsumersImpl.java | 2 +-
.../artemis/core/server/impl/QueueImpl.java | 72 ++--
.../artemis/tests/integration/amqp/JMSLVQTest.java | 32 ++
.../integration/amqp/JMSNonDestructiveTest.java | 3 +
.../tests/integration/jms/client/LVQTest.java | 4 +-
.../tests/integration/server/LVQRecoveryTest.java | 2 +-
.../artemis/tests/integration/server/LVQTest.java | 29 +-
.../artemis/tests/unit/util/LinkedListTest.java | 35 ++
11 files changed, 183 insertions(+), 439 deletions(-)
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
index 9f12b47246..4f3b8e9dc9 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
@@ -538,6 +538,10 @@ public class LinkedListImpl<E> implements LinkedList<E> {
current = current.prev;
current.iterCount++;
+
+ if (last == node) {
+ last = current;
+ }
} else {
current = null;
}
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedListImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedListImpl.java
index f1e50d3261..4d01441c9b 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedListImpl.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedListImpl.java
@@ -30,7 +30,7 @@ public class PriorityLinkedListImpl<E> implements PriorityLinkedList<E> {
private static final AtomicIntegerFieldUpdater<PriorityLinkedListImpl> SIZE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PriorityLinkedListImpl.class, "size");
- protected LinkedListImpl<E>[] levels;
+ protected final LinkedListImpl<E>[] levels;
private volatile int size;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index f01403fb90..50dc850c09 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -17,16 +17,11 @@
package org.apache.activemq.artemis.core.server.impl;
import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
+import java.util.HashMap;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.function.Consumer;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
@@ -38,13 +33,13 @@ import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
-import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.jboss.logging.Logger;
/**
@@ -59,25 +54,9 @@ import org.jboss.logging.Logger;
public class LastValueQueue extends QueueImpl {
private static final Logger logger = Logger.getLogger(LastValueQueue.class);
- private final Map<SimpleString, HolderReference> map = new ConcurrentHashMap<>();
+ private final Map<SimpleString, MessageReference> map = new HashMap<>();
private final SimpleString lastValueKey;
- // only use this within synchronized methods or synchronized(this) blocks
- protected final LinkedList<MessageReference> nextDeliveries = new LinkedList<>();
-
-
- /* in certain cases we need to redeliver a message */
- @Override
- protected MessageReference nextDelivery() {
- return nextDeliveries.poll();
- }
-
- @Override
- protected void repeatNextDelivery(MessageReference reference) {
- // put the ref back onto the head of the list so that the next time poll() is called this ref is returned
- nextDeliveries.addFirst(reference);
- }
-
@Deprecated
public LastValueQueue(final long persistenceID,
@@ -162,116 +141,49 @@ public class LastValueQueue extends QueueImpl {
@Override
public synchronized void addTail(final MessageReference ref, final boolean direct) {
- if (scheduleIfPossible(ref)) {
- return;
- }
- final SimpleString prop = ref.getLastValueProperty();
-
- if (prop != null) {
- HolderReference hr = map.get(prop);
-
- if (hr != null) {
- if (isNonDestructive() && hr.isInDelivery()) {
- // if the ref is already being delivered we'll do the replace in the postAcknowledge
- hr.setReplacementRef(ref);
- } else {
- // We need to overwrite the old ref with the new one and ack the old one
- replaceLVQMessage(ref, hr);
-
- if (isNonDestructive() && hr.isDelivered()) {
- hr.resetDelivered();
- // since we're replacing a ref that was already delivered we want to trigger a delivery for this new replacement
- nextDeliveries.add(hr);
- deliverAsync();
- }
- }
- } else {
- hr = new HolderReference(prop, ref);
-
- map.put(prop, hr);
-
- super.addTail(hr, isNonDestructive() ? false : direct);
- }
- } else {
+ if (!scheduleIfPossible(ref)) {
+ trackLastValue(ref);
super.addTail(ref, isNonDestructive() ? false : direct);
}
}
-
@Override
- public long getMessageCount() {
- if (pageSubscription != null) {
- // messageReferences will have depaged messages which we need to discount from the counter as they are
- // counted on the pageSubscription as well
- return (long) pendingMetrics.getMessageCount() + getScheduledCount() + pageSubscription.getMessageCount();
- } else {
- return (long) pendingMetrics.getMessageCount() + getScheduledCount();
- }
+ public void addHead(final MessageReference ref, boolean scheduling) {
+ if (scheduling) {
+ // track last value when scheduled message is actually enqueued
+ trackLastValue(ref);
+ } else if (isNonDestructive() == false) {
+ // for released messages from a consumer or tx that have been destroyed,
+ // use as a last value in the absence of any newer value, it may be stale
+ trackLastValueIfAbsent(ref);
+ }
+ super.addHead(ref, scheduling);
}
- /** LVQ has to use regular addHead due to last value queues calculations */
@Override
- public void addSorted(MessageReference ref, boolean scheduling) {
- this.addHead(ref, scheduling);
+ public void addSorted(final MessageReference ref, boolean scheduling) {
+ addHead(ref, scheduling);
}
- /** LVQ has to use regular addHead due to last value queues calculations */
- @Override
- public void addSorted(List<MessageReference> refs, boolean scheduling) {
- this.addHead(refs, scheduling);
- }
-
- @Override
- public synchronized void addHead(final MessageReference ref, boolean scheduling) {
- // we first need to check redelivery-delay, as we can't put anything on headers if redelivery-delay
- if (!scheduling && scheduledDeliveryHandler.checkAndSchedule(ref, false)) {
- return;
+ private void trackLastValue(MessageReference ref) {
+ final SimpleString lastValueProperty = ref.getLastValueProperty();
+ if (lastValueProperty != null) {
+ map.put(lastValueProperty, ref);
}
+ }
- SimpleString lastValueProp = ref.getLastValueProperty();
-
- if (lastValueProp != null) {
- HolderReference hr = map.get(lastValueProp);
-
- if (hr != null) {
- if (scheduling) {
- // We need to overwrite the old ref with the new one and ack the old one
-
- replaceLVQMessage(ref, hr);
- } else {
- // We keep the current ref and ack the one we are returning
-
- super.referenceHandled(ref);
-
- try {
- super.acknowledge(ref);
- } catch (Exception e) {
- ActiveMQServerLogger.LOGGER.errorAckingOldReference(e);
- }
- }
- } else {
- hr = new HolderReference(lastValueProp, ref);
-
- map.put(lastValueProp, hr);
-
- super.addHead(hr, scheduling);
- }
- } else {
- super.addHead(ref, scheduling);
+ private void trackLastValueIfAbsent(MessageReference ref) {
+ final SimpleString lastValueProperty = ref.getLastValueProperty();
+ if (lastValueProperty != null) {
+ map.putIfAbsent(lastValueProperty, ref);
}
}
@Override
- public void postAcknowledge(final MessageReference ref, AckReason reason) {
- if (isNonDestructive()) {
- if (ref instanceof HolderReference) {
- HolderReference hr = (HolderReference) ref;
- if (hr.getReplacementRef() != null) {
- replaceLVQMessage(hr.getReplacementRef(), hr);
- }
- }
- }
- super.postAcknowledge(ref, reason);
+ public long getMessageCount() {
+ // with LV - delivered messages can remain on the queue so the delivering count
+ // count must be discounted else we are accounting the same message more than once
+ return super.getMessageCount() - getDeliveringCount();
}
@Override
@@ -284,21 +196,36 @@ public class LastValueQueue extends QueueImpl {
return super.getQueueConfiguration().setLastValue(true).setLastValueKey(lastValueKey);
}
- private void replaceLVQMessage(MessageReference ref, HolderReference hr) {
- MessageReference oldRef = hr.getReference();
-
- referenceHandled(oldRef);
- super.refRemoved(oldRef);
-
- try {
- oldRef.acknowledge(null, AckReason.REPLACED, null);
- } catch (Exception e) {
- ActiveMQServerLogger.LOGGER.errorAckingOldReference(e);
+ @Override
+ protected void pruneLastValues() {
+ // called with synchronized(this) from super.deliver()
+ try (LinkedListIterator<MessageReference> iter = messageReferences.iterator()) {
+ while (iter.hasNext()) {
+ MessageReference ref = iter.next();
+ if (!currentLastValue(ref)) {
+ iter.remove();
+ try {
+ referenceHandled(ref);
+ super.refRemoved(ref);
+ ref.acknowledge(null, AckReason.REPLACED, null);
+ } catch (Exception e) {
+ ActiveMQServerLogger.LOGGER.errorAckingOldReference(e);
+ }
+ }
+ }
}
+ }
- hr.setReference(ref);
- addRefSize(ref);
- refAdded(ref);
+ private boolean currentLastValue(final MessageReference ref) {
+ boolean currentLastValue = false;
+ SimpleString lastValueProp = ref.getLastValueProperty();
+ if (lastValueProp != null) {
+ MessageReference current = map.get(lastValueProp);
+ if (current == ref) {
+ currentLastValue = true;
+ }
+ }
+ return currentLastValue;
}
@Override
@@ -327,16 +254,9 @@ public class LastValueQueue extends QueueImpl {
}
@Override
- public synchronized void reload(final MessageReference ref) {
- // repopulate LVQ map & reload proper HolderReferences
- SimpleString lastValueProp = ref.getLastValueProperty();
- if (lastValueProp != null) {
- HolderReference hr = new HolderReference(lastValueProp, ref);
- map.put(lastValueProp, hr);
- super.reload(hr);
- } else {
- super.reload(ref);
- }
+ public synchronized void reload(final MessageReference newRef) {
+ trackLastValue(newRef);
+ super.reload(newRef);
}
private synchronized void removeIfCurrent(MessageReference ref) {
@@ -361,9 +281,6 @@ public class LastValueQueue extends QueueImpl {
};
}
-
-
-
@Override
public boolean isLastValue() {
return true;
@@ -378,238 +295,6 @@ public class LastValueQueue extends QueueImpl {
return Collections.unmodifiableSet(map.keySet());
}
- private static class HolderReference implements MessageReference {
-
- private final SimpleString prop;
-
- private volatile boolean delivered = false;
-
- private volatile MessageReference ref;
-
- private volatile MessageReference replacementRef;
-
- private long consumerID;
-
- private boolean hasConsumerID = false;
-
-
- public MessageReference getReplacementRef() {
- return replacementRef;
- }
-
- public void setReplacementRef(MessageReference replacementRef) {
- this.replacementRef = replacementRef;
- }
-
- public void resetDelivered() {
- delivered = false;
- }
-
- public boolean isDelivered() {
- return delivered;
- }
-
- HolderReference(final SimpleString prop, final MessageReference ref) {
- this.prop = prop;
-
- this.ref = ref;
- }
-
- @Override
- public void onDelivery(Consumer<? super MessageReference> callback) {
- // HolderReference may be reused among different consumers, so we don't set a callback and won't support Runnables
- }
-
- MessageReference getReference() {
- return ref;
- }
-
- @Override
- public void handled() {
- delivered = true;
- // We need to remove the entry from the map just before it gets delivered
- ref.handled();
- if (!ref.getQueue().isNonDestructive()) {
- ((LastValueQueue) ref.getQueue()).removeIfCurrent(this);
- }
- }
-
- @Override
- public void setInDelivery(boolean inDelivery) {
- ref.setInDelivery(inDelivery);
- }
-
- @Override
- public boolean isInDelivery() {
- return ref.isInDelivery();
- }
-
- @Override
- public Object getProtocolData() {
- return ref.getProtocolData();
- }
-
- @Override
- public void setProtocolData(Object data) {
- ref.setProtocolData(data);
- }
-
- @Override
- public void setAlreadyAcked() {
- ref.setAlreadyAcked();
- }
-
- @Override
- public boolean isAlreadyAcked() {
- return ref.isAlreadyAcked();
- }
-
- void setReference(final MessageReference ref) {
- this.ref = ref;
- }
-
- @Override
- public MessageReference copy(final Queue queue) {
- return ref.copy(queue);
- }
-
- @Override
- public void decrementDeliveryCount() {
- ref.decrementDeliveryCount();
- }
-
- @Override
- public int getDeliveryCount() {
- return ref.getDeliveryCount();
- }
-
- @Override
- public Message getMessage() {
- return ref.getMessage();
- }
-
- @Override
- public long getMessageID() {
- return ref.getMessageID();
- }
-
- @Override
- public boolean isDurable() {
- return getMessage().isDurable();
- }
-
- @Override
- public SimpleString getLastValueProperty() {
- return prop;
- }
-
- @Override
- public Queue getQueue() {
- return ref.getQueue();
- }
-
- @Override
- public long getScheduledDeliveryTime() {
- return ref.getScheduledDeliveryTime();
- }
-
- @Override
- public void incrementDeliveryCount() {
- ref.incrementDeliveryCount();
- }
-
- @Override
- public void setDeliveryCount(final int deliveryCount) {
- ref.setDeliveryCount(deliveryCount);
- }
-
- @Override
- public void setScheduledDeliveryTime(final long scheduledDeliveryTime) {
- ref.setScheduledDeliveryTime(scheduledDeliveryTime);
- }
-
- @Override
- public void acknowledge(Transaction tx) throws Exception {
- ref.acknowledge(tx);
- }
-
- @Override
- public void acknowledge(Transaction tx, ServerConsumer consumer) throws Exception {
- ref.acknowledge(tx, consumer);
- }
-
- @Override
- public void acknowledge(Transaction tx, AckReason reason, ServerConsumer consumer) throws Exception {
- ref.acknowledge(tx, reason, consumer);
- }
-
- @Override
- public void setPersistedCount(int count) {
- ref.setPersistedCount(count);
- }
-
- @Override
- public int getPersistedCount() {
- return ref.getPersistedCount();
- }
-
- @Override
- public boolean isPaged() {
- return false;
- }
-
- /* (non-Javadoc)
- * @see org.apache.activemq.artemis.core.server.MessageReference#acknowledge(org.apache.activemq.artemis.core.server.MessageReference)
- */
- @Override
- public void acknowledge() throws Exception {
- ref.getQueue().acknowledge(this);
- }
-
- /* (non-Javadoc)
- * @see org.apache.activemq.artemis.core.server.MessageReference#getMessageMemoryEstimate()
- */
- @Override
- public int getMessageMemoryEstimate() {
- return ref.getMessage().getMemoryEstimate();
- }
-
- @Override
- public void emptyConsumerID() {
- this.hasConsumerID = false;
- }
-
- @Override
- public void setConsumerId(long consumerID) {
- this.hasConsumerID = true;
- this.consumerID = consumerID;
- }
-
- @Override
- public boolean hasConsumerId() {
- return hasConsumerID;
- }
-
- @Override
- public long getConsumerId() {
- if (!this.hasConsumerID) {
- throw new IllegalStateException("consumerID isn't specified: please check hasConsumerId first");
- }
- return this.consumerID;
- }
-
- @Override
- public long getPersistentSize() throws ActiveMQException {
- return ref.getPersistentSize();
- }
-
- @Override
- public String toString() {
- return new StringBuilder().append("HolderReference").append("@").append(Integer.toHexString(System.identityHashCode(this))).append("[ref=").append(ref).append("]").toString();
- }
-
- }
-
@Override
public int hashCode() {
final int prime = 31;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
index 1afca46a6a..e3964a1d22 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
@@ -49,7 +49,7 @@ public class QueueConsumersImpl<T extends PriorityAware> implements QueueConsume
private final PriorityCollection<T> consumers = new PriorityCollection<>(CopyOnWriteArraySet::new);
private final Collection<T> unmodifiableConsumers = Collections.unmodifiableCollection(consumers);
- private UpdatableIterator<T> iterator = new UpdatableIterator<>(consumers.resettableIterator());
+ private final UpdatableIterator<T> iterator = new UpdatableIterator<>(consumers.resettableIterator());
//-- START :: ResettableIterator Methods
// As any iterator, these are not thread-safe and should ONLY be called by a single thread at a time.
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 6987bcf4a3..8afa15f363 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -194,7 +194,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private final MpscUnboundedArrayQueue<MessageReference> intermediateMessageReferences = new MpscUnboundedArrayQueue<>(8192);
// This is where messages are stored
- private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<>(QueueImpl.NUM_PRIORITIES, MessageReferenceImpl.getIDComparator());
+ protected final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<>(QueueImpl.NUM_PRIORITIES, MessageReferenceImpl.getIDComparator());
private NodeStore<MessageReference> nodeStore;
@@ -340,16 +340,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private volatile long ringSize;
- /* in certain cases we need to redeliver a message directly.
- * it's useful for usecases last LastValueQueue */
- protected MessageReference nextDelivery() {
- return null;
- }
-
- protected void repeatNextDelivery(MessageReference reference) {
-
- }
-
@Override
public boolean isSwept() {
return swept;
@@ -2895,7 +2885,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
}
- private synchronized void doInternalPoll() {
+ synchronized void doInternalPoll() {
int added = 0;
MessageReference ref;
@@ -2974,27 +2964,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
long timeout = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(DELIVERY_TIMEOUT);
consumers.reset();
while (true) {
- if (handled == MAX_DELIVERIES_IN_LOOP) {
- // Schedule another one - we do this to prevent a single thread getting caught up in this loop for too
- // long
-
- deliverAsync(true);
-
- return false;
- }
-
- if (System.nanoTime() - timeout > 0) {
- if (logger.isTraceEnabled()) {
- logger.trace("delivery has been running for too long. Scheduling another delivery task now");
- }
-
+ if (handled == MAX_DELIVERIES_IN_LOOP || System.nanoTime() - timeout > 0) {
+ // Schedule another one - we do this to prevent a single thread getting caught up in this loop for too long
deliverAsync(true);
-
return false;
}
MessageReference ref;
-
Consumer handledconsumer = null;
synchronized (this) {
@@ -3024,6 +3000,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (consumers.hasNext()) {
holder = consumers.next();
} else {
+ pruneLastValues();
break;
}
@@ -3034,15 +3011,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
holder.iter = messageReferences.iterator();
}
- // LVQ support
- ref = nextDelivery();
- boolean nextDelivery = false;
- if (ref != null) {
- nextDelivery = true;
- }
-
- if (ref == null && holder.iter.hasNext()) {
+ if (holder.iter.hasNext()) {
ref = holder.iter.next();
+ } else {
+ ref = null;
}
if (ref == null) {
@@ -3092,18 +3064,14 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
handled++;
consumers.reset();
} else if (status == HandleStatus.BUSY) {
- if (nextDelivery) {
- repeatNextDelivery(ref);
- } else {
- try {
- holder.iter.repeat();
- } catch (NoSuchElementException e) {
- // this could happen if there was an exception on the queue handling
- // and it returned BUSY because of that exception
- //
- // We will just log it as there's nothing else we can do now.
- logger.warn(e.getMessage(), e);
- }
+ try {
+ holder.iter.repeat();
+ } catch (NoSuchElementException e) {
+ // this could happen if there was an exception on the queue handling
+ // and it returned BUSY because of that exception
+ //
+ // We will just log it as there's nothing else we can do now.
+ logger.warn(e.getMessage(), e);
}
noDelivery++;
@@ -3130,6 +3098,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
// Round robin'd all
if (noDelivery == this.consumers.size()) {
+ pruneLastValues();
+
if (handledconsumer != null) {
// this shouldn't really happen,
// however I'm keeping this as an assertion case future developers ever change the logic here on this class
@@ -3144,6 +3114,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
noDelivery = 0;
}
+
}
if (handledconsumer != null) {
@@ -3154,6 +3125,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return true;
}
+ // called with 'this' locked
+ protected void pruneLastValues() {
+ // interception point for LVQ
+ }
+
protected void removeMessageReference(ConsumerHolder<? extends Consumer> holder, MessageReference ref) {
holder.iter.remove();
refRemoved(ref);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSLVQTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSLVQTest.java
index 844e735bac..8b0e9732ff 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSLVQTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSLVQTest.java
@@ -31,6 +31,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.junit.Test;
public class JMSLVQTest extends JMSClientTestSupport {
@@ -178,4 +179,35 @@ public class JMSLVQTest extends JMSClientTestSupport {
p.send(queue1, message2);
}
}
+
+ @Test
+ public void testNonDestructiveWithSelector() throws Exception {
+ final String MY_QUEUE = RandomUtil.randomString();
+ final boolean NON_DESTRUCTIVE = true;
+ server.createQueue(new QueueConfiguration(MY_QUEUE).setRoutingType(RoutingType.ANYCAST).setNonDestructive(NON_DESTRUCTIVE).setLastValue(true));
+
+ ConnectionSupplier connectionSupplier = CoreConnection;
+
+ Connection consumerConnection1 = connectionSupplier.createConnection();
+ Session consumerSession1 = consumerConnection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue myQueue = consumerSession1.createQueue(MY_QUEUE);
+ MessageConsumer consumer1 = consumerSession1.createConsumer(myQueue);
+ consumerConnection1.start();
+
+ Connection consumerConnection2 = connectionSupplier.createConnection();
+ Session consumerSession2 = consumerConnection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ myQueue = consumerSession2.createQueue(MY_QUEUE);
+ MessageConsumer consumer2 = consumerSession2.createConsumer(myQueue, "foo='bar'");
+
+ Connection producerConnection = connectionSupplier.createConnection();
+ Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer p = producerSession.createProducer(myQueue);
+
+ for (int i = 0; i < 1000; i++) {
+ TextMessage m = producerSession.createTextMessage();
+ m.setStringProperty(Message.HDR_LAST_VALUE_NAME.toString(), "abc");
+ p.send(m);
+ assertNotNull(consumer1.receive(500));
+ }
+ }
}
\ No newline at end of file
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
index 609d7121a0..143cce2a23 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
@@ -620,6 +620,7 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
HashMap<String, Integer> dups = new HashMap<>();
List<Producer> producers = new ArrayList<>();
+ int receivedTally = 0;
try (Connection connection = connectionSupplier.createConnection()) {
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
@@ -641,6 +642,7 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
if (tm == null) {
break;
}
+ receivedTally++;
results.get(tm.getStringProperty("lastval")).add(tm.getText());
tm.acknowledge();
}
@@ -669,6 +671,7 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
Assert.fail("Duplicate messages received " + sb);
}
+ Assert.assertEquals("Got all messages produced", MESSAGE_COUNT_PER_GROUP * GROUP_COUNT * PRODUCER_COUNT, receivedTally);
Wait.assertEquals((long) GROUP_COUNT, () -> server.locateQueue(NON_DESTRUCTIVE_LVQ_QUEUE_NAME).getMessageCount(), 2000, 100, false);
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/LVQTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/LVQTest.java
index f0f6900486..ba1d22d38a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/LVQTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/LVQTest.java
@@ -87,9 +87,9 @@ public class LVQTest extends JMSTestBase {
assertNotNull(tm);
assertEquals("Message 2", tm.getText());
- // It is important to query here
- // as we shouldn't rely on addHead after the consumer is closed
org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue("random");
+ // one message on the queue and one in delivery - the same message if it's an LVQ
+ // LVQ getMessageCount will discount!
Wait.assertEquals(1, serverQueue::getMessageCount);
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQRecoveryTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQRecoveryTest.java
index 46e27ad9a2..015f34fbb4 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQRecoveryTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQRecoveryTest.java
@@ -157,7 +157,7 @@ public class LVQRecoveryTest extends ActiveMQTestBase {
m = consumer.receive(1000);
Assert.assertNotNull(m);
m.acknowledge();
- Assert.assertEquals(m.getBodyBuffer().readString(), "m6");
+ Assert.assertEquals("m6", m.getBodyBuffer().readString());
m = consumer.receiveImmediate();
Assert.assertNull(m);
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java
index b040f6bc7c..763f6ee3cd 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java
@@ -86,7 +86,7 @@ public class LVQTest extends ActiveMQTestBase {
ClientMessage m2 = createTextMessage(clientSession, "m2");
m2.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
producer.send(m2);
- assertEquals(1, server.locateQueue(qName1).getMessageCount());
+ Wait.assertEquals(1, () -> server.locateQueue(qName1).getMessageCount());
clientSession.close();
server.stop();
@@ -100,7 +100,8 @@ public class LVQTest extends ActiveMQTestBase {
ClientMessage m3 = createTextMessage(clientSession, "m3");
m3.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
producer.send(m3);
- assertEquals(1, server.locateQueue(qName1).getMessageCount());
+ // wait b/c prune takes a deliver attempt which is async
+ Wait.assertEquals(1, () -> server.locateQueue(qName1).getMessageCount());
ClientConsumer consumer = clientSession.createConsumer(qName1);
clientSession.start();
@@ -293,7 +294,6 @@ public class LVQTest extends ActiveMQTestBase {
@Test
public void testMultipleMessagesInTx() throws Exception {
ClientProducer producer = clientSessionTxReceives.createProducer(address);
- ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1);
SimpleString messageId1 = new SimpleString("SMID1");
SimpleString messageId2 = new SimpleString("SMID2");
ClientMessage m1 = createTextMessage(clientSession, "m1");
@@ -308,6 +308,7 @@ public class LVQTest extends ActiveMQTestBase {
producer.send(m2);
producer.send(m3);
producer.send(m4);
+ ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1);
clientSessionTxReceives.start();
ClientMessage m = consumer.receive(1000);
Assert.assertNotNull(m);
@@ -392,6 +393,7 @@ public class LVQTest extends ActiveMQTestBase {
public void testMultipleMessagesInTxSend() throws Exception {
ClientProducer producer = clientSessionTxSends.createProducer(address);
ClientConsumer consumer = clientSessionTxSends.createConsumer(qName1);
+ clientSessionTxSends.start();
SimpleString rh = new SimpleString("SMID1");
ClientMessage m1 = createTextMessage(clientSession, "m1");
m1.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
@@ -412,11 +414,18 @@ public class LVQTest extends ActiveMQTestBase {
producer.send(m5);
producer.send(m6);
clientSessionTxSends.commit();
- clientSessionTxSends.start();
+ for (int i = 1; i < 6; i++) {
+ ClientMessage m = consumer.receive(1000);
+ Assert.assertNotNull(m);
+ m.acknowledge();
+ Assert.assertEquals("m" + i, m.getBodyBuffer().readString());
+ }
+ consumer.close();
+ consumer = clientSessionTxSends.createConsumer(qName1);
ClientMessage m = consumer.receive(1000);
Assert.assertNotNull(m);
m.acknowledge();
- Assert.assertEquals(m.getBodyBuffer().readString(), "m6");
+ Assert.assertEquals("m6", m.getBodyBuffer().readString());
}
@Test
@@ -460,7 +469,6 @@ public class LVQTest extends ActiveMQTestBase {
@Test
public void testMultipleMessagesPersistedCorrectlyInTx() throws Exception {
ClientProducer producer = clientSessionTxSends.createProducer(address);
- ClientConsumer consumer = clientSessionTxSends.createConsumer(qName1);
SimpleString rh = new SimpleString("SMID1");
ClientMessage m1 = createTextMessage(clientSession, "m1");
m1.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
@@ -488,6 +496,7 @@ public class LVQTest extends ActiveMQTestBase {
producer.send(m6);
clientSessionTxSends.commit();
clientSessionTxSends.start();
+ ClientConsumer consumer = clientSessionTxSends.createConsumer(qName1);
ClientMessage m = consumer.receive(1000);
Assert.assertNotNull(m);
m.acknowledge();
@@ -705,7 +714,6 @@ public class LVQTest extends ActiveMQTestBase {
@Test
public void testLargeMessage() throws Exception {
ClientProducer producer = clientSessionTxReceives.createProducer(address);
- ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1);
SimpleString rh = new SimpleString("SMID1");
for (int i = 0; i < 50; i++) {
@@ -715,6 +723,7 @@ public class LVQTest extends ActiveMQTestBase {
producer.send(message);
clientSession.commit();
}
+ ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1);
clientSessionTxReceives.start();
ClientMessage m = consumer.receive(1000);
Assert.assertNotNull(m);
@@ -736,11 +745,11 @@ public class LVQTest extends ActiveMQTestBase {
Queue queue = server.locateQueue(qName1);
producer.send(m1);
- long oldSize = queue.getPersistentSize();
+ Wait.assertEquals(123, () -> queue.getPersistentSize());
producer.send(m2);
+ // encoded size is a little larger than payload
+ Wait.assertTrue(() -> queue.getPersistentSize() > 10 * 1024);
assertEquals(queue.getDeliveringSize(), 0);
- assertNotEquals(queue.getPersistentSize(), oldSize);
- assertTrue(queue.getPersistentSize() > 10 * 1024);
}
@Test
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java
index 373619139c..47f3761f52 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java
@@ -1032,6 +1032,41 @@ public class LinkedListTest extends ActiveMQTestBase {
}
+ @Test
+ public void testRemoveLastNudgeNoReplay() {
+ for (int i = 1; i < 3; i++) {
+ doTestRemoveLastNudgeNoReplay(i);
+ }
+ }
+
+ private void doTestRemoveLastNudgeNoReplay(int num) {
+
+ LinkedListIterator<Integer> iter = list.iterator();
+
+ for (int i = 0; i < num; i++) {
+ list.addTail(i);
+ }
+
+ // exhaust iterator
+ for (int i = 0; i < num; i++) {
+ assertTrue(iter.hasNext());
+ assertEquals(i, iter.next().intValue());
+ }
+
+ // remove last
+ LinkedListIterator<Integer> pruneIterator = list.iterator();
+ while (pruneIterator.hasNext()) {
+ int v = pruneIterator.next();
+ if (v == num - 1) {
+ pruneIterator.remove();
+ }
+ }
+
+ // ensure existing iterator does not reset or replay
+ assertFalse(iter.hasNext());
+ assertEquals(num - 1, list.size());
+ }
+
@Test
public void testGCNepotismPoll() {
final int count = 100;