You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2021/02/02 00:19:35 UTC

[activemq-artemis] branch master updated: ARTEMIS-3089 direct delivery can break LVQ+non-destructive

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new d9e114d  ARTEMIS-3089 direct delivery can break LVQ+non-destructive
     new 36b3289  This closes #3424
d9e114d is described below

commit d9e114da551a9e35e8dcefa1d25bad3125dcd038
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Fri Jan 29 15:17:23 2021 -0600

    ARTEMIS-3089 direct delivery can break LVQ+non-destructive
---
 .../artemis/core/server/impl/LastValueQueue.java   |  4 +-
 .../integration/amqp/JMSNonDestructiveTest.java    | 72 ++++++++++++++++++++++
 2 files changed, 74 insertions(+), 2 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index d0ee971..1df1bce 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -174,10 +174,10 @@ public class LastValueQueue extends QueueImpl {
 
             map.put(prop, hr);
 
-            super.addTail(hr, direct);
+            super.addTail(hr, isNonDestructive() ? false : direct);
          }
       } else {
-         super.addTail(ref, direct);
+         super.addTail(ref, isNonDestructive() ? false : direct);
       }
    }
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
index 7d96e28..c39381a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.tests.integration.amqp;
 
+import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
@@ -25,6 +26,10 @@ import javax.jms.Session;
 import javax.jms.TextMessage;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
@@ -129,6 +134,16 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
       testNonDestructive(AMQPConnection, CoreConnection);
    }
 
+   @Test
+   public void testNonDestructiveLVQWithConsumerFirstCore() throws Exception {
+      testNonDestructiveLVQWithConsumerFirst(CoreConnection);
+   }
+
+   @Test
+   public void testNonDestructiveLVQWithConsumerFirstAMQP() throws Exception {
+      testNonDestructiveLVQWithConsumerFirst(AMQPConnection);
+   }
+
    public void testNonDestructive(ConnectionSupplier producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier) throws Exception {
       testNonDestructiveSingle(producerConnectionSupplier, consumerConnectionSupplier);
       testNonDestructiveDualConsumer(producerConnectionSupplier, consumerConnectionSupplier);
@@ -286,6 +301,63 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
       assertEquals("Message count after clearing queue via queue control should be 0", 0, queueBinding.getQueue().getMessageCount());
    }
 
+   public void testNonDestructiveLVQWithConsumerFirst(ConnectionSupplier connectionSupplier) throws Exception {
+      ExecutorService executor = Executors.newFixedThreadPool(1);
+      CountDownLatch consumerSetup = new CountDownLatch(1);
+      CountDownLatch consumerComplete = new CountDownLatch(1);
+
+      /*
+       * Create the consumer before any messages are sent and keep it there so that the first message which arrives
+       * on the queue triggers direct delivery. Without the fix in this commit this essentially "poisons" the queue
+       * so that consumers can't get messages later.
+       */
+      executor.submit(() -> {
+         try (Connection connection = connectionSupplier.createConnection();
+              Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+              MessageConsumer messageConsumer = session.createConsumer(session.createQueue(NON_DESTRUCTIVE_LVQ_QUEUE_NAME))) {
+            connection.start();
+            consumerSetup.countDown();
+            BytesMessage messageReceived = (BytesMessage) messageConsumer.receive(5000);
+            assertNotNull(messageReceived);
+            consumerComplete.countDown();
+         } catch (Exception e) {
+            fail(e.getMessage());
+         }
+
+         consumerComplete.countDown();
+      });
+
+      // wait for the consumer thread to start and get everything setup
+      consumerSetup.await(5, TimeUnit.SECONDS);
+
+      try (Connection connection = connectionSupplier.createConnection();
+           Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
+         MessageProducer producer = session.createProducer(session.createQueue(NON_DESTRUCTIVE_LVQ_QUEUE_NAME));
+         BytesMessage message = session.createBytesMessage();
+         message.writeUTF("mills " + System.currentTimeMillis());
+         message.setStringProperty("_AMQ_LVQ_NAME", "STOCK_NAME");
+         producer.send(message);
+
+         // wait for the consumer to close then send another message
+         consumerComplete.await(5, TimeUnit.SECONDS);
+
+         message = session.createBytesMessage();
+         message.writeUTF("mills " + System.currentTimeMillis());
+         message.setStringProperty("_AMQ_LVQ_NAME", "STOCK_NAME");
+         producer.send(message);
+      }
+
+      try (Connection connection = connectionSupplier.createConnection();
+           Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+           MessageConsumer messageConsumer = session.createConsumer(session.createQueue(NON_DESTRUCTIVE_LVQ_QUEUE_NAME))) {
+         connection.start();
+         BytesMessage messageReceived = (BytesMessage) messageConsumer.receive(5000);
+         assertNotNull(messageReceived);
+      }
+
+      executor.shutdownNow();
+   }
+
    public void testNonDestructiveLVQTombstone(ConnectionSupplier producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier) throws Exception {
       int tombstoneTimeToLive = 500;