You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2022/12/17 06:04:33 UTC
[activemq-artemis] branch main updated: ARTEMIS-3834 include paged messages sending to DLA
This is an automated email from the ASF dual-hosted git repository.
jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 6dd7965906 ARTEMIS-3834 include paged messages sending to DLA
new 135e3a17bc This closes #4079
6dd7965906 is described below
commit 6dd7965906cf626a5c398c7583df53a348a531cb
Author: AntonRoskvist <an...@volvo.com>
AuthorDate: Tue May 24 20:07:01 2022 +0200
ARTEMIS-3834 include paged messages sending to DLA
---
.../artemis/core/server/impl/QueueImpl.java | 33 +++++----
.../integration/management/QueueControlTest.java | 86 ++++++++++++++++++++++
.../management/QueueControlUsingCoreTest.java | 2 +-
3 files changed, 107 insertions(+), 14 deletions(-)
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index e990162503..8802c6bc6f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2576,27 +2576,34 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return true;
}
}
+ if (pageIterator != null && !queueDestroyed) {
+ while (pageIterator.hasNext()) {
+ PagedReference ref = pageIterator.next();
+ if (ref.getMessage().getMessageID() == messageID) {
+ incDelivering(ref);
+ sendToDeadLetterAddress(null, ref);
+ pageIterator.remove();
+ refRemoved(ref);
+ return true;
+ }
+ }
+ }
return false;
}
}
@Override
public synchronized int sendMessagesToDeadLetterAddress(Filter filter) throws Exception {
- int count = 0;
- try (LinkedListIterator<MessageReference> iter = iterator()) {
- while (iter.hasNext()) {
- MessageReference ref = iter.next();
- if (filter == null || filter.match(ref.getMessage())) {
- incDelivering(ref);
- sendToDeadLetterAddress(null, ref);
- iter.remove();
- refRemoved(ref);
- count++;
- }
+ return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
+
+ @Override
+ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
+
+ incDelivering(ref);
+ return sendToDeadLetterAddress(tx, ref);
}
- return count;
- }
+ });
}
@Override
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
index 123a143a13..b59147fb1b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
@@ -1653,6 +1653,92 @@ public class QueueControlTest extends ManagementTestBase {
assertEquals(0, QueueImplTestAccessor.getQueueMemorySize(q2));
}
+ /**
+ * Test send to DLA while paging includes paged messages
+ */
+ @Test
+ public void testSendToDLAIncludesPagedMessages() throws Exception {
+ final SimpleString dla = new SimpleString("DLA");
+ final SimpleString qName = new SimpleString("q1");
+ final SimpleString adName = new SimpleString("ad1");
+ final SimpleString dlq = new SimpleString("DLQ1");
+ final String sampleText = "Put me on DLQ";
+ final int messageCount = 10;
+
+ AddressSettings addressSettings = new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla).setMaxSizeBytes(200L);
+ server.getAddressSettingsRepository().addMatch(adName.toString(), addressSettings);
+ server.getAddressSettingsRepository().addMatch(dla.toString(), addressSettings);
+
+ session.createQueue(new QueueConfiguration(dlq).setAddress(dla).setDurable(durable));
+ session.createQueue(new QueueConfiguration(qName).setAddress(adName).setDurable(durable));
+
+ // Send message to queue, make sure address enters paging.
+ ClientProducer producer = session.createProducer(adName);
+ for (int i = 0; i < messageCount; i++) {
+ producer.send(createTextMessage(session, sampleText));
+ }
+
+ Wait.assertTrue(server.locateQueue(qName).getPagingStore()::isPaging);
+
+ //Send all messages to DLA, make sure all are sent
+ QueueControl queueControl = createManagementControl(adName, qName);
+ Assert.assertEquals(messageCount, queueControl.sendMessagesToDeadLetterAddress(null));
+ Assert.assertEquals(0, getMessageCount(queueControl));
+
+ //Make sure all shows up on DLA
+ queueControl = createManagementControl(dla, dlq);
+ Assert.assertEquals(messageCount, getMessageCount(queueControl));
+
+ queueControl.removeAllMessages();
+
+ }
+
+ /**
+ * Test send single message to DLA while paging includes paged message
+ */
+ @Test
+ public void testSendMessageToDLAIncludesPagedMessage() throws Exception {
+ final SimpleString dla = new SimpleString("DLA");
+ final SimpleString qName = new SimpleString("q1");
+ final SimpleString adName = new SimpleString("ad1");
+ final SimpleString dlq = new SimpleString("DLQ1");
+ final String sampleText = "Message Content";
+ final int messageCount = 10;
+
+ AddressSettings addressSettings = new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla).setMaxSizeBytes(200L);
+ server.getAddressSettingsRepository().addMatch(adName.toString(), addressSettings);
+ server.getAddressSettingsRepository().addMatch(dla.toString(), addressSettings);
+
+ session.createQueue(new QueueConfiguration(dlq).setAddress(dla).setDurable(durable));
+ session.createQueue(new QueueConfiguration(qName).setAddress(adName).setDurable(durable));
+
+ // Send message to queue, make sure address enters paging.
+ ClientProducer producer = session.createProducer(adName);
+ for (int i = 0; i < messageCount; i++) {
+ producer.send(createTextMessage(session, sampleText));
+ }
+
+ Wait.assertTrue(server.locateQueue(qName).getPagingStore()::isPaging);
+
+ //Send identifiable message to DLA
+ producer.send(createTextMessage(session, sampleText).putStringProperty("myID", "unique"));
+ QueueControl queueControl = createManagementControl(adName, qName);
+ Map<String, Object>[] messages = queueControl.listMessages(null);
+ long messageID = (Long) messages[messageCount].get("messageID");
+
+ Assert.assertTrue(queueControl.sendMessageToDeadLetterAddress(messageID));
+ queueControl.removeAllMessages();
+
+ //Make sure it shows up on DLA
+ queueControl = createManagementControl(dla, dlq);
+ messages = queueControl.listMessages(null);
+ Assert.assertEquals(1, messages.length);
+ Assert.assertEquals("unique", (String) messages[0].get("myID"));
+
+ queueControl.removeAllMessages();
+
+ }
+
/**
* <ol>
* <li>send a message to queue</li>
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
index 6bbcb160b8..5feca43665 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
@@ -545,7 +545,7 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
@Override
public int sendMessagesToDeadLetterAddress(final String filterStr) throws Exception {
- return (Integer) proxy.invokeOperation("sendMessagesToDeadLetterAddress", filterStr);
+ return (Integer) proxy.invokeOperation(Integer.class, "sendMessagesToDeadLetterAddress", filterStr);
}
@Override