You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2022/12/17 06:04:33 UTC

[activemq-artemis] branch main updated: ARTEMIS-3834 include paged messages sending to DLA

This is an automated email from the ASF dual-hosted git repository.

jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 6dd7965906 ARTEMIS-3834 include paged messages sending to DLA
     new 135e3a17bc This closes #4079
6dd7965906 is described below

commit 6dd7965906cf626a5c398c7583df53a348a531cb
Author: AntonRoskvist <an...@volvo.com>
AuthorDate: Tue May 24 20:07:01 2022 +0200

    ARTEMIS-3834 include paged messages sending to DLA
---
 .../artemis/core/server/impl/QueueImpl.java        | 33 +++++----
 .../integration/management/QueueControlTest.java   | 86 ++++++++++++++++++++++
 .../management/QueueControlUsingCoreTest.java      |  2 +-
 3 files changed, 107 insertions(+), 14 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index e990162503..8802c6bc6f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2576,27 +2576,34 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                return true;
             }
          }
+         if (pageIterator != null && !queueDestroyed) {
+            while (pageIterator.hasNext()) {
+               PagedReference ref = pageIterator.next();
+               if (ref.getMessage().getMessageID() == messageID) {
+                  incDelivering(ref);
+                  sendToDeadLetterAddress(null, ref);
+                  pageIterator.remove();
+                  refRemoved(ref);
+                  return true;
+               }
+            }
+         }
          return false;
       }
    }
 
    @Override
    public synchronized int sendMessagesToDeadLetterAddress(Filter filter) throws Exception {
-      int count = 0;
 
-      try (LinkedListIterator<MessageReference> iter = iterator()) {
-         while (iter.hasNext()) {
-            MessageReference ref = iter.next();
-            if (filter == null || filter.match(ref.getMessage())) {
-               incDelivering(ref);
-               sendToDeadLetterAddress(null, ref);
-               iter.remove();
-               refRemoved(ref);
-               count++;
-            }
+      return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
+
+         @Override
+         public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
+
+            incDelivering(ref);
+            return sendToDeadLetterAddress(tx, ref);
          }
-         return count;
-      }
+      });
    }
 
    @Override
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
index 123a143a13..b59147fb1b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
@@ -1653,6 +1653,92 @@ public class QueueControlTest extends ManagementTestBase {
       assertEquals(0, QueueImplTestAccessor.getQueueMemorySize(q2));
    }
 
+   /**
+    * Test send to DLA while paging includes paged messages
+    */
+   @Test
+   public void testSendToDLAIncludesPagedMessages() throws Exception {
+      final SimpleString dla = new SimpleString("DLA");
+      final SimpleString qName = new SimpleString("q1");
+      final SimpleString adName = new SimpleString("ad1");
+      final SimpleString dlq = new SimpleString("DLQ1");
+      final String sampleText = "Put me on DLQ";
+      final int messageCount = 10;
+
+      AddressSettings addressSettings = new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla).setMaxSizeBytes(200L);
+      server.getAddressSettingsRepository().addMatch(adName.toString(), addressSettings);
+      server.getAddressSettingsRepository().addMatch(dla.toString(), addressSettings);
+
+      session.createQueue(new QueueConfiguration(dlq).setAddress(dla).setDurable(durable));
+      session.createQueue(new QueueConfiguration(qName).setAddress(adName).setDurable(durable));
+
+      // Send message to queue, make sure address enters paging.
+      ClientProducer producer = session.createProducer(adName);
+      for (int i = 0; i < messageCount; i++) {
+         producer.send(createTextMessage(session, sampleText));
+      }
+
+      Wait.assertTrue(server.locateQueue(qName).getPagingStore()::isPaging);
+
+      //Send all messages to DLA, make sure all are sent
+      QueueControl queueControl = createManagementControl(adName, qName);
+      Assert.assertEquals(messageCount, queueControl.sendMessagesToDeadLetterAddress(null));
+      Assert.assertEquals(0, getMessageCount(queueControl));
+
+      //Make sure all shows up on DLA
+      queueControl = createManagementControl(dla, dlq);
+      Assert.assertEquals(messageCount, getMessageCount(queueControl));
+
+      queueControl.removeAllMessages();
+
+   }
+
+   /**
+    * Test send single message to DLA while paging includes paged message
+    */
+   @Test
+   public void testSendMessageToDLAIncludesPagedMessage() throws Exception {
+      final SimpleString dla = new SimpleString("DLA");
+      final SimpleString qName = new SimpleString("q1");
+      final SimpleString adName = new SimpleString("ad1");
+      final SimpleString dlq = new SimpleString("DLQ1");
+      final String sampleText = "Message Content";
+      final int messageCount = 10;
+
+      AddressSettings addressSettings = new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla).setMaxSizeBytes(200L);
+      server.getAddressSettingsRepository().addMatch(adName.toString(), addressSettings);
+      server.getAddressSettingsRepository().addMatch(dla.toString(), addressSettings);
+
+      session.createQueue(new QueueConfiguration(dlq).setAddress(dla).setDurable(durable));
+      session.createQueue(new QueueConfiguration(qName).setAddress(adName).setDurable(durable));
+
+      // Send message to queue, make sure address enters paging.
+      ClientProducer producer = session.createProducer(adName);
+      for (int i = 0; i < messageCount; i++) {
+         producer.send(createTextMessage(session, sampleText));
+      }
+
+      Wait.assertTrue(server.locateQueue(qName).getPagingStore()::isPaging);
+
+      //Send identifiable message to DLA
+      producer.send(createTextMessage(session, sampleText).putStringProperty("myID", "unique"));
+      QueueControl queueControl = createManagementControl(adName, qName);
+      Map<String, Object>[] messages = queueControl.listMessages(null);
+      long messageID = (Long) messages[messageCount].get("messageID");
+
+      Assert.assertTrue(queueControl.sendMessageToDeadLetterAddress(messageID));
+      queueControl.removeAllMessages();
+
+      //Make sure it shows up on DLA
+      queueControl = createManagementControl(dla, dlq);
+      messages = queueControl.listMessages(null);
+      Assert.assertEquals(1, messages.length);
+      Assert.assertEquals("unique", (String) messages[0].get("myID"));
+
+      queueControl.removeAllMessages();
+
+   }
+
    /**
     * <ol>
     * <li>send a message to queue</li>
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
index 6bbcb160b8..5feca43665 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
@@ -545,7 +545,7 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
 
          @Override
          public int sendMessagesToDeadLetterAddress(final String filterStr) throws Exception {
-            return (Integer) proxy.invokeOperation("sendMessagesToDeadLetterAddress", filterStr);
+            return (Integer) proxy.invokeOperation(Integer.class, "sendMessagesToDeadLetterAddress", filterStr);
          }
 
          @Override