You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2020/07/20 16:08:25 UTC
[activemq-artemis] branch master updated: ARTEMIS-2843
non-destructive LVQ not delivering msg to consumer
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new ccc0fa7 ARTEMIS-2843 non-destructive LVQ not delivering msg to consumer
new bc55a2f This closes #3216
ccc0fa7 is described below
commit ccc0fa7100575fb5b0a73229f0762829bcab2b5a
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Fri Jul 10 12:12:09 2020 -0400
ARTEMIS-2843 non-destructive LVQ not delivering msg to consumer
---
.../artemis/core/server/impl/LastValueQueue.java | 39 ++++++++++++++++
.../artemis/core/server/impl/QueueImpl.java | 4 +-
.../tests/integration/jms/client/LVQTest.java | 54 ++++++++++++++++++++++
3 files changed, 95 insertions(+), 2 deletions(-)
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index 90cd148..9f3c82b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -154,6 +154,21 @@ public class LastValueQueue extends QueueImpl {
replaceLVQMessage(ref, hr);
+ if (isNonDestructive() && hr.isDelivered()) {
+ hr.resetDelivered();
+ // --------------------------------------------------------------------------------
+ // If non Destructive, and if a reference was previously delivered
+ // we would not be able to receive this message again
+ // unless we reset the iterators
+ // The message is not removed, so we can't actually remove it
+ // a result of this operation is that previously delivered messages
+ // will probably be delivered again.
+ // if we ever want to avoid other redeliveries we would have to implement a reset or redeliver
+ // operation on the iterator for a single message
+ resetAllIterators();
+ deliverAsync();
+ }
+
} else {
hr = new HolderReference(prop, ref);
@@ -166,6 +181,18 @@ public class LastValueQueue extends QueueImpl {
}
}
+
+ @Override
+ public long getMessageCount() {
+ if (pageSubscription != null) {
+ // messageReferences will have depaged messages which we need to discount from the counter as they are
+ // counted on the pageSubscription as well
+ return (long) pendingMetrics.getMessageCount() + getScheduledCount() + pageSubscription.getMessageCount();
+ } else {
+ return (long) pendingMetrics.getMessageCount() + getScheduledCount();
+ }
+ }
+
@Override
public synchronized void addHead(final MessageReference ref, boolean scheduling) {
// we first need to check redelivery-delay, as we can't put anything on headers if redelivery-delay
@@ -301,12 +328,23 @@ public class LastValueQueue extends QueueImpl {
private final SimpleString prop;
+ private volatile boolean delivered = false;
+
private volatile MessageReference ref;
private long consumerID;
private boolean hasConsumerID = false;
+
+ public void resetDelivered() {
+ delivered = false;
+ }
+
+ public boolean isDelivered() {
+ return delivered;
+ }
+
HolderReference(final SimpleString prop, final MessageReference ref) {
this.prop = prop;
@@ -324,6 +362,7 @@ public class LastValueQueue extends QueueImpl {
@Override
public void handled() {
+ delivered = true;
// We need to remove the entry from the map just before it gets delivered
ref.handled();
if (!ref.getQueue().isNonDestructive()) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 040e4a3..7e88817 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -170,7 +170,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private final PagingStore pagingStore;
- private final PageSubscription pageSubscription;
+ protected final PageSubscription pageSubscription;
private ReferenceCounter refCountForConsumers;
@@ -192,7 +192,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
// The estimate of memory being consumed by this queue. Used to calculate instances of messages to depage
private final AtomicInteger queueMemorySize = new AtomicInteger(0);
- private final QueueMessageMetrics pendingMetrics = new QueueMessageMetrics(this, "pending");
+ protected final QueueMessageMetrics pendingMetrics = new QueueMessageMetrics(this, "pending");
private final QueueMessageMetrics deliveringMetrics = new QueueMessageMetrics(this, "delivering");
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/LVQTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/LVQTest.java
index 1274713..f0f6900 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/LVQTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/LVQTest.java
@@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Before;
import org.junit.Test;
@@ -49,6 +50,59 @@ public class LVQTest extends JMSTestBase {
}
@Test
+ public void testLVQandNonDestructive() throws Exception {
+ ActiveMQConnectionFactory fact = (ActiveMQConnectionFactory) getCF();
+ fact.setConsumerWindowSize(0);
+
+ try (Connection connection = fact.createConnection();
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE)) {
+
+ // swapping these two lines makes the test either succeed for fail
+ // Queue queue = session.createQueue("random?last-value=true");
+ Queue queue = session.createQueue("random?last-value=true&non-destructive=true");
+
+ MessageProducer producer = session.createProducer(queue);
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connection.start();
+
+ TextMessage message = session.createTextMessage();
+ message.setText("Message 1");
+ message.setStringProperty(Message.HDR_LAST_VALUE_NAME.toString(), "A");
+ producer.send(message);
+
+ TextMessage tm = (TextMessage) consumer.receive(2000);
+ assertNotNull(tm);
+ tm.acknowledge();
+
+ Thread.sleep(1000);
+ assertEquals("Message 1", tm.getText());
+
+ message = session.createTextMessage();
+ message.setText("Message 2");
+ message.setStringProperty(Message.HDR_LAST_VALUE_NAME.toString(), "A");
+ producer.send(message);
+
+ tm = (TextMessage) consumer.receive(2000);
+ assertNotNull(tm);
+ assertEquals("Message 2", tm.getText());
+
+ // It is important to query here
+ // as we shouldn't rely on addHead after the consumer is closed
+ org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue("random");
+ Wait.assertEquals(1, serverQueue::getMessageCount);
+ }
+
+ org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue("random");
+ Wait.assertEquals(1, serverQueue::getMessageCount);
+
+ serverQueue.deleteMatchingReferences(null);
+ // This should be removed all
+ assertEquals(0, serverQueue.getMessageCount());
+
+ }
+
+ @Test
public void testLastValueQueueUsingAddressQueueParameters() throws Exception {
ActiveMQConnectionFactory fact = (ActiveMQConnectionFactory) getCF();