You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2023/02/16 15:55:51 UTC

[activemq] branch main updated: AMQ-9217 - Fix IndividualDeadLetter strategy rollback

This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new 459388185 AMQ-9217 - Fix IndividualDeadLetter strategy rollback
     new ae4e305f8 Merge pull request #980 from cshannon/amq-9217-rollback
459388185 is described below

commit 459388185a3acd3a175d304b08af3c638e870292
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
AuthorDate: Thu Feb 16 07:21:20 2023 -0500

    AMQ-9217 - Fix IndividualDeadLetter strategy rollback
    
    This fixes the rollback after the latest changes by using the
    originalDestination property to look up the correct message audit on
    rollback
---
 .../broker/region/policy/AbstractDeadLetterStrategy.java   |  6 +++---
 .../broker/region/policy/IndividualDeadLetterStrategy.java | 14 +++++++++++---
 .../broker/region/policy/SharedDeadLetterStrategy.java     |  2 +-
 3 files changed, 15 insertions(+), 7 deletions(-)

diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
index 82e07560e..242120681 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
@@ -36,7 +36,7 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy {
     @Override
     public void rollback(Message message) {
         if (message != null && this.enableAudit) {
-            lookupActiveMQMessageAudit(message).rollback(message);
+            lookupActiveMQMessageAudit(message, true).rollback(message);
         }
     }
 
@@ -45,7 +45,7 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy {
         boolean result = false;
         if (message != null) {
             result = true;
-            if (enableAudit && lookupActiveMQMessageAudit(message).isDuplicate(message)) {
+            if (enableAudit && lookupActiveMQMessageAudit(message, false).isDuplicate(message)) {
                 result = false;
                 LOG.debug("Not adding duplicate to DLQ: {}, dest: {}", message.getMessageId(), message.getDestination());
             }
@@ -115,5 +115,5 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy {
 
     public abstract int getMaxAuditDepth();
 
-    protected abstract ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message);
+    protected abstract ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message, boolean rollback);
 }
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java
index 3dd41ae0b..f626365fa 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java
@@ -202,15 +202,23 @@ public class IndividualDeadLetterStrategy extends AbstractDeadLetterStrategy {
     }
 
     @Override
-    protected ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message) {
+    protected ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message, boolean rollback) {
         ActiveMQMessageAudit messageAudit;
 
         synchronized(dedicatedMessageAudits) {
-            messageAudit = dedicatedMessageAudits.get(message.getDestination().getQualifiedName());
+            // Normally we want to just use the destination property on the message as the key for the map for
+            // caching the messageAudit object for each destination. However, when rolling back, the message
+            // provided here has had its destination changed to the individual DLQ destination and is no longer
+            // the original destination. So to find the correct messageAudit to rollback we need to use
+            // the originalDestination property on the message to get the correct destination that was
+            // used to first cache the messageAudit.
+            final String destinationName = rollback && message.getOriginalDestination() != null ?
+                message.getOriginalDestination().getQualifiedName() : message.getDestination().getQualifiedName();
+            messageAudit = dedicatedMessageAudits.get(destinationName);
 
             if(messageAudit == null) {
                 messageAudit = new ActiveMQMessageAudit(getMaxAuditDepth(), getMaxProducersToAudit());
-                dedicatedMessageAudits.put(message.getDestination().getQualifiedName(), messageAudit);
+                dedicatedMessageAudits.put(destinationName, messageAudit);
             }
 
             return messageAudit;
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java
index 8a78e83cf..a3dfe0e74 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java
@@ -71,7 +71,7 @@ public class SharedDeadLetterStrategy extends AbstractDeadLetterStrategy {
     }
 
     @Override
-    protected ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message) {
+    protected ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message, boolean rollback) {
         return messageAudit;
     }