You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/06/18 00:25:39 UTC
svn commit: r1493958 - in /activemq/trunk:
activemq-broker/src/main/java/org/apache/activemq/broker/region/
activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/
activemq-unit-tests/src/test/java/org/apache/activemq/bugs/
Author: tabish
Date: Mon Jun 17 22:25:39 2013
New Revision: 1493958
URL: http://svn.apache.org/r1493958
Log:
Fix and updated test for: https://issues.apache.org/jira/browse/AMQ-3405
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3405Test.java
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1493958&r1=1493957&r2=1493958&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java Mon Jun 17 22:25:39 2013
@@ -59,6 +59,7 @@ import org.apache.activemq.broker.region
import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
import org.apache.activemq.broker.region.group.MessageGroupMap;
import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
+import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
import org.apache.activemq.broker.util.InsertionCountList;
@@ -1451,9 +1452,13 @@ public class Queue extends BaseDestinati
BrokerSupport.resend(context, m.getMessage(), dest);
removeMessage(context, m);
messagesLock.writeLock().lock();
- try{
+ try {
messages.rollback(m.getMessageId());
- }finally {
+ if (isDLQ()) {
+ DeadLetterStrategy stratagy = getDeadLetterStrategy();
+ stratagy.rollback(m.getMessage());
+ }
+ } finally {
messagesLock.writeLock().unlock();
}
return true;
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java?rev=1493958&r1=1493957&r2=1493958&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java Mon Jun 17 22:25:39 2013
@@ -24,16 +24,23 @@ import org.slf4j.LoggerFactory;
/**
* A strategy for choosing which destination is used for dead letter queue
* messages.
- *
- *
+ *
*/
public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy {
private static final Logger LOG = LoggerFactory.getLogger(AbstractDeadLetterStrategy.class);
private boolean processNonPersistent = false;
private boolean processExpired = true;
private boolean enableAudit = true;
- private ActiveMQMessageAudit messageAudit = new ActiveMQMessageAudit();
+ private final ActiveMQMessageAudit messageAudit = new ActiveMQMessageAudit();
+ @Override
+ public void rollback(Message message) {
+ if (message != null && this.enableAudit) {
+ messageAudit.rollback(message);
+ }
+ }
+
+ @Override
public boolean isSendToDeadLetterQueue(Message message) {
boolean result = false;
if (message != null) {
@@ -57,6 +64,7 @@ public abstract class AbstractDeadLetter
/**
* @return the processExpired
*/
+ @Override
public boolean isProcessExpired() {
return this.processExpired;
}
@@ -64,6 +72,7 @@ public abstract class AbstractDeadLetter
/**
* @param processExpired the processExpired to set
*/
+ @Override
public void setProcessExpired(boolean processExpired) {
this.processExpired = processExpired;
}
@@ -71,6 +80,7 @@ public abstract class AbstractDeadLetter
/**
* @return the processNonPersistent
*/
+ @Override
public boolean isProcessNonPersistent() {
return this.processNonPersistent;
}
@@ -78,6 +88,7 @@ public abstract class AbstractDeadLetter
/**
* @param processNonPersistent the processNonPersistent to set
*/
+ @Override
public void setProcessNonPersistent(boolean processNonPersistent) {
this.processNonPersistent = processNonPersistent;
}
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java?rev=1493958&r1=1493957&r2=1493958&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java Mon Jun 17 22:25:39 2013
@@ -22,11 +22,11 @@ import org.apache.activemq.command.Messa
/**
* A strategy for choosing which destination is used for dead letter queue messages.
- *
- *
+ *
+ *
*/
public interface DeadLetterStrategy {
-
+
/**
* Allow pluggable strategy for deciding if message should be sent to a dead letter queue
* for example, you might not want to ignore expired or non-persistent messages
@@ -62,4 +62,13 @@ public interface DeadLetterStrategy {
public boolean isDLQ(ActiveMQDestination destination);
+ /**
+ * Allows for a Message that was already processed by a DLQ to be rolled back in case
+ * of a move or a retry of that message, otherwise the Message would be considered a
+ * duplicate if this strategy is doing Message Auditing.
+ *
+ * @param message
+ */
+ public void rollback(Message message);
+
}
Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3405Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3405Test.java?rev=1493958&r1=1493957&r2=1493958&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3405Test.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3405Test.java Mon Jun 17 22:25:39 2013
@@ -39,7 +39,6 @@ import org.apache.activemq.RedeliveryPol
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.QueueViewMBean;
-import org.apache.activemq.broker.region.policy.AbstractDeadLetterStrategy;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
@@ -79,11 +78,9 @@ public class AMQ3405Test extends TestSup
BrokerService broker = new BrokerService();
broker.setPersistent(false);
PolicyEntry policy = new PolicyEntry();
- policy.setEnableAudit(false);
DeadLetterStrategy defaultDeadLetterStrategy = policy.getDeadLetterStrategy();
if(defaultDeadLetterStrategy!=null) {
defaultDeadLetterStrategy.setProcessNonPersistent(true);
- ((AbstractDeadLetterStrategy) defaultDeadLetterStrategy).setEnableAudit(false);
}
PolicyMap pMap = new PolicyMap();
pMap.setDefaultEntry(policy);