You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2020/07/20 16:08:25 UTC

[activemq-artemis] branch master updated: ARTEMIS-2843 non-destructive LVQ not delivering msg to consumer

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new ccc0fa7  ARTEMIS-2843 non-destructive LVQ not delivering msg to consumer
     new bc55a2f  This closes #3216
ccc0fa7 is described below

commit ccc0fa7100575fb5b0a73229f0762829bcab2b5a
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Fri Jul 10 12:12:09 2020 -0400

    ARTEMIS-2843 non-destructive LVQ not delivering msg to consumer
---
 .../artemis/core/server/impl/LastValueQueue.java   | 39 ++++++++++++++++
 .../artemis/core/server/impl/QueueImpl.java        |  4 +-
 .../tests/integration/jms/client/LVQTest.java      | 54 ++++++++++++++++++++++
 3 files changed, 95 insertions(+), 2 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index 90cd148..9f3c82b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -154,6 +154,21 @@ public class LastValueQueue extends QueueImpl {
 
             replaceLVQMessage(ref, hr);
 
+            if (isNonDestructive() && hr.isDelivered()) {
+               hr.resetDelivered();
+               // --------------------------------------------------------------------------------
+               // If non Destructive, and if a reference was previously delivered
+               // we would not be able to receive this message again
+               // unless we reset the iterators
+               // The message is not removed, so we can't actually remove it
+               // a result of this operation is that previously delivered messages
+               // will probably be delivered again.
+               // if we ever want to avoid other redeliveries we would have to implement a reset or redeliver
+               // operation on the iterator for a single message
+               resetAllIterators();
+               deliverAsync();
+            }
+
          } else {
             hr = new HolderReference(prop, ref);
 
@@ -166,6 +181,18 @@ public class LastValueQueue extends QueueImpl {
       }
    }
 
+
+   @Override
+   public long getMessageCount() {
+      if (pageSubscription != null) {
+         // messageReferences will have depaged messages which we need to discount from the counter as they are
+         // counted on the pageSubscription as well
+         return (long) pendingMetrics.getMessageCount() + getScheduledCount() + pageSubscription.getMessageCount();
+      } else {
+         return (long) pendingMetrics.getMessageCount() + getScheduledCount();
+      }
+   }
+
    @Override
    public synchronized void addHead(final MessageReference ref, boolean scheduling) {
       // we first need to check redelivery-delay, as we can't put anything on headers if redelivery-delay
@@ -301,12 +328,23 @@ public class LastValueQueue extends QueueImpl {
 
       private final SimpleString prop;
 
+      private volatile boolean delivered = false;
+
       private volatile MessageReference ref;
 
       private long consumerID;
 
       private boolean hasConsumerID = false;
 
+
+      public void resetDelivered() {
+         delivered = false;
+      }
+
+      public boolean isDelivered() {
+         return delivered;
+      }
+
       HolderReference(final SimpleString prop, final MessageReference ref) {
          this.prop = prop;
 
@@ -324,6 +362,7 @@ public class LastValueQueue extends QueueImpl {
 
       @Override
       public void handled() {
+         delivered = true;
          // We need to remove the entry from the map just before it gets delivered
          ref.handled();
          if (!ref.getQueue().isNonDestructive()) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 040e4a3..7e88817 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -170,7 +170,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    private final PagingStore pagingStore;
 
-   private final PageSubscription pageSubscription;
+   protected final PageSubscription pageSubscription;
 
    private ReferenceCounter refCountForConsumers;
 
@@ -192,7 +192,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    // The estimate of memory being consumed by this queue. Used to calculate instances of messages to depage
    private final AtomicInteger queueMemorySize = new AtomicInteger(0);
 
-   private final QueueMessageMetrics pendingMetrics = new QueueMessageMetrics(this, "pending");
+   protected final QueueMessageMetrics pendingMetrics = new QueueMessageMetrics(this, "pending");
 
    private final QueueMessageMetrics deliveringMetrics = new QueueMessageMetrics(this, "delivering");
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/LVQTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/LVQTest.java
index 1274713..f0f6900 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/LVQTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/LVQTest.java
@@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
 import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
 import org.apache.activemq.artemis.tests.util.JMSTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -49,6 +50,59 @@ public class LVQTest extends JMSTestBase {
    }
 
    @Test
+   public void testLVQandNonDestructive() throws Exception {
+      ActiveMQConnectionFactory fact = (ActiveMQConnectionFactory) getCF();
+      fact.setConsumerWindowSize(0);
+
+      try (Connection connection = fact.createConnection();
+           Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE)) {
+
+         // swapping these two lines makes the test either succeed for fail
+         // Queue queue = session.createQueue("random?last-value=true");
+         Queue queue = session.createQueue("random?last-value=true&non-destructive=true");
+
+         MessageProducer producer = session.createProducer(queue);
+         MessageConsumer consumer = session.createConsumer(queue);
+
+         connection.start();
+
+         TextMessage message = session.createTextMessage();
+         message.setText("Message 1");
+         message.setStringProperty(Message.HDR_LAST_VALUE_NAME.toString(), "A");
+         producer.send(message);
+
+         TextMessage tm = (TextMessage) consumer.receive(2000);
+         assertNotNull(tm);
+         tm.acknowledge();
+
+         Thread.sleep(1000);
+         assertEquals("Message 1", tm.getText());
+
+         message = session.createTextMessage();
+         message.setText("Message 2");
+         message.setStringProperty(Message.HDR_LAST_VALUE_NAME.toString(), "A");
+         producer.send(message);
+
+         tm = (TextMessage) consumer.receive(2000);
+         assertNotNull(tm);
+         assertEquals("Message 2", tm.getText());
+
+         // It is important to query here
+         // as we shouldn't rely on addHead after the consumer is closed
+         org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue("random");
+         Wait.assertEquals(1, serverQueue::getMessageCount);
+      }
+
+      org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue("random");
+      Wait.assertEquals(1, serverQueue::getMessageCount);
+
+      serverQueue.deleteMatchingReferences(null);
+      // This should be removed all
+      assertEquals(0, serverQueue.getMessageCount());
+
+   }
+
+   @Test
    public void testLastValueQueueUsingAddressQueueParameters() throws Exception {
       ActiveMQConnectionFactory fact = (ActiveMQConnectionFactory) getCF();