You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2023/02/16 15:55:51 UTC
[activemq] branch main updated: AMQ-9217 - Fix IndividualDeadLetter strategy rollback
This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new 459388185 AMQ-9217 - Fix IndividualDeadLetter strategy rollback
new ae4e305f8 Merge pull request #980 from cshannon/amq-9217-rollback
459388185 is described below
commit 459388185a3acd3a175d304b08af3c638e870292
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
AuthorDate: Thu Feb 16 07:21:20 2023 -0500
AMQ-9217 - Fix IndividualDeadLetter strategy rollback
This fixes the rollback after the latest changes by using the
originalDestination property to look up the correct message audit on
rollback
---
.../broker/region/policy/AbstractDeadLetterStrategy.java | 6 +++---
.../broker/region/policy/IndividualDeadLetterStrategy.java | 14 +++++++++++---
.../broker/region/policy/SharedDeadLetterStrategy.java | 2 +-
3 files changed, 15 insertions(+), 7 deletions(-)
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
index 82e07560e..242120681 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
@@ -36,7 +36,7 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy {
@Override
public void rollback(Message message) {
if (message != null && this.enableAudit) {
- lookupActiveMQMessageAudit(message).rollback(message);
+ lookupActiveMQMessageAudit(message, true).rollback(message);
}
}
@@ -45,7 +45,7 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy {
boolean result = false;
if (message != null) {
result = true;
- if (enableAudit && lookupActiveMQMessageAudit(message).isDuplicate(message)) {
+ if (enableAudit && lookupActiveMQMessageAudit(message, false).isDuplicate(message)) {
result = false;
LOG.debug("Not adding duplicate to DLQ: {}, dest: {}", message.getMessageId(), message.getDestination());
}
@@ -115,5 +115,5 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy {
public abstract int getMaxAuditDepth();
- protected abstract ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message);
+ protected abstract ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message, boolean rollback);
}
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java
index 3dd41ae0b..f626365fa 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java
@@ -202,15 +202,23 @@ public class IndividualDeadLetterStrategy extends AbstractDeadLetterStrategy {
}
@Override
- protected ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message) {
+ protected ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message, boolean rollback) {
ActiveMQMessageAudit messageAudit;
synchronized(dedicatedMessageAudits) {
- messageAudit = dedicatedMessageAudits.get(message.getDestination().getQualifiedName());
+ // Normally we want to just use the destination property on the message as the key for the map for
+ // caching the messageAudit object for each destination. However, when rolling back, the message
+ // provided here has had its destination changed to the individual DLQ destination and is no longer
+ // the original destination. So to find the correct messageAudit to rollback we need to use
+ // the originalDestination property on the message to get the correct destination that was
+ // used to first cache the messageAudit.
+ final String destinationName = rollback && message.getOriginalDestination() != null ?
+ message.getOriginalDestination().getQualifiedName() : message.getDestination().getQualifiedName();
+ messageAudit = dedicatedMessageAudits.get(destinationName);
if(messageAudit == null) {
messageAudit = new ActiveMQMessageAudit(getMaxAuditDepth(), getMaxProducersToAudit());
- dedicatedMessageAudits.put(message.getDestination().getQualifiedName(), messageAudit);
+ dedicatedMessageAudits.put(destinationName, messageAudit);
}
return messageAudit;
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java
index 8a78e83cf..a3dfe0e74 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java
@@ -71,7 +71,7 @@ public class SharedDeadLetterStrategy extends AbstractDeadLetterStrategy {
}
@Override
- protected ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message) {
+ protected ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message, boolean rollback) {
return messageAudit;
}