You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/06/18 00:25:39 UTC

svn commit: r1493958 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/broker/region/ activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/ activemq-unit-tests/src/test/java/org/apache/activemq/bugs/

Author: tabish
Date: Mon Jun 17 22:25:39 2013
New Revision: 1493958

URL: http://svn.apache.org/r1493958
Log:
Fix and updated test for: https://issues.apache.org/jira/browse/AMQ-3405

Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3405Test.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1493958&r1=1493957&r2=1493958&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java Mon Jun 17 22:25:39 2013
@@ -59,6 +59,7 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
 import org.apache.activemq.broker.region.group.MessageGroupMap;
 import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
+import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
 import org.apache.activemq.broker.region.policy.DispatchPolicy;
 import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
 import org.apache.activemq.broker.util.InsertionCountList;
@@ -1451,9 +1452,13 @@ public class Queue extends BaseDestinati
         BrokerSupport.resend(context, m.getMessage(), dest);
         removeMessage(context, m);
         messagesLock.writeLock().lock();
-        try{
+        try {
             messages.rollback(m.getMessageId());
-        }finally {
+            if (isDLQ()) {
+                DeadLetterStrategy stratagy = getDeadLetterStrategy();
+                stratagy.rollback(m.getMessage());
+            }
+        } finally {
             messagesLock.writeLock().unlock();
         }
         return true;

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java?rev=1493958&r1=1493957&r2=1493958&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java Mon Jun 17 22:25:39 2013
@@ -24,16 +24,23 @@ import org.slf4j.LoggerFactory;
 /**
  * A strategy for choosing which destination is used for dead letter queue
  * messages.
- * 
- * 
+ *
  */
 public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractDeadLetterStrategy.class);
     private boolean processNonPersistent = false;
     private boolean processExpired = true;
     private boolean enableAudit = true;
-    private ActiveMQMessageAudit messageAudit = new ActiveMQMessageAudit();
+    private final ActiveMQMessageAudit messageAudit = new ActiveMQMessageAudit();
 
+    @Override
+    public void rollback(Message message) {
+        if (message != null && this.enableAudit) {
+            messageAudit.rollback(message);
+        }
+    }
+
+    @Override
     public boolean isSendToDeadLetterQueue(Message message) {
         boolean result = false;
         if (message != null) {
@@ -57,6 +64,7 @@ public abstract class AbstractDeadLetter
     /**
      * @return the processExpired
      */
+    @Override
     public boolean isProcessExpired() {
         return this.processExpired;
     }
@@ -64,6 +72,7 @@ public abstract class AbstractDeadLetter
     /**
      * @param processExpired the processExpired to set
      */
+    @Override
     public void setProcessExpired(boolean processExpired) {
         this.processExpired = processExpired;
     }
@@ -71,6 +80,7 @@ public abstract class AbstractDeadLetter
     /**
      * @return the processNonPersistent
      */
+    @Override
     public boolean isProcessNonPersistent() {
         return this.processNonPersistent;
     }
@@ -78,6 +88,7 @@ public abstract class AbstractDeadLetter
     /**
      * @param processNonPersistent the processNonPersistent to set
      */
+    @Override
     public void setProcessNonPersistent(boolean processNonPersistent) {
         this.processNonPersistent = processNonPersistent;
     }

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java?rev=1493958&r1=1493957&r2=1493958&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java Mon Jun 17 22:25:39 2013
@@ -22,11 +22,11 @@ import org.apache.activemq.command.Messa
 
 /**
  * A strategy for choosing which destination is used for dead letter queue messages.
- * 
- * 
+ *
+ *
  */
 public interface DeadLetterStrategy {
-    
+
     /**
      * Allow pluggable strategy for deciding if message should be sent to a dead letter queue
      * for example, you might not want to ignore expired or non-persistent messages
@@ -62,4 +62,13 @@ public interface DeadLetterStrategy {
 
     public boolean isDLQ(ActiveMQDestination destination);
 
+    /**
+     * Allows for a Message that was already processed by a DLQ to be rolled back in case
+     * of a move or a retry of that message, otherwise the Message would be considered a
+     * duplicate if this strategy is doing Message Auditing.
+     *
+     * @param message
+     */
+    public void rollback(Message message);
+
 }

Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3405Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3405Test.java?rev=1493958&r1=1493957&r2=1493958&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3405Test.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3405Test.java Mon Jun 17 22:25:39 2013
@@ -39,7 +39,6 @@ import org.apache.activemq.RedeliveryPol
 import org.apache.activemq.TestSupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.jmx.QueueViewMBean;
-import org.apache.activemq.broker.region.policy.AbstractDeadLetterStrategy;
 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
@@ -79,11 +78,9 @@ public class AMQ3405Test extends TestSup
         BrokerService broker = new BrokerService();
         broker.setPersistent(false);
         PolicyEntry policy = new PolicyEntry();
-        policy.setEnableAudit(false);
         DeadLetterStrategy defaultDeadLetterStrategy = policy.getDeadLetterStrategy();
         if(defaultDeadLetterStrategy!=null) {
             defaultDeadLetterStrategy.setProcessNonPersistent(true);
-            ((AbstractDeadLetterStrategy) defaultDeadLetterStrategy).setEnableAudit(false);
         }
         PolicyMap pMap = new PolicyMap();
         pMap.setDefaultEntry(policy);