You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2017/11/01 11:26:47 UTC
activemq git commit: [AMQ-6847] limit the retry loop to one iteration
overa all pending messages such that new additions are not replayed to avoid
duplicates
Repository: activemq
Updated Branches:
refs/heads/master 2ea5d1420 -> 03b19b9da
[AMQ-6847] limit the retry loop to one iteration overa all pending messages such that new additions are not replayed to avoid duplicates
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/03b19b9d
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/03b19b9d
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/03b19b9d
Branch: refs/heads/master
Commit: 03b19b9da4d50c3bb8985f930e93596c7d994d26
Parents: 2ea5d14
Author: gtully <ga...@gmail.com>
Authored: Wed Nov 1 11:26:36 2017 +0000
Committer: gtully <ga...@gmail.com>
Committed: Wed Nov 1 11:26:36 2017 +0000
----------------------------------------------------------------------
.../java/org/apache/activemq/broker/region/Queue.java | 10 +++++++---
.../java/org/apache/activemq/broker/jmx/MBeanTest.java | 10 +++++++---
2 files changed, 14 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/03b19b9d/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 4a2d272..04ef3fd 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1484,8 +1484,8 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
try {
messages.rollback(m.getMessageId());
if (isDLQ()) {
- DeadLetterStrategy stratagy = getDeadLetterStrategy();
- stratagy.rollback(m.getMessage());
+ DeadLetterStrategy strategy = getDeadLetterStrategy();
+ strategy.rollback(m.getMessage());
}
} finally {
messagesLock.writeLock().unlock();
@@ -1569,6 +1569,9 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
throw new Exception("Retry of message is only possible on Dead Letter Queues!");
}
int restoredCounter = 0;
+ // ensure we deal with a snapshot to avoid potential duplicates in the event of messages
+ // getting immediate dlq'ed
+ long numberOfRetryAttemptsToCheckAllMessagesOnce = this.destinationStatistics.getMessages().getCount();
Set<MessageReference> set = new LinkedHashSet<MessageReference>();
do {
doPageIn(true);
@@ -1580,6 +1583,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
}
List<MessageReference> list = new ArrayList<MessageReference>(set);
for (MessageReference ref : list) {
+ numberOfRetryAttemptsToCheckAllMessagesOnce--;
if (ref.getMessage().getOriginalDestination() != null) {
moveMessageTo(context, (QueueMessageReference)ref, ref.getMessage().getOriginalDestination());
@@ -1589,7 +1593,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
}
}
}
- } while (set.size() < this.destinationStatistics.getMessages().getCount() && set.size() < maximumMessages);
+ } while (numberOfRetryAttemptsToCheckAllMessagesOnce > 0 && set.size() < this.destinationStatistics.getMessages().getCount());
return restoredCounter;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/03b19b9d/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
index ecc6894..0ccf1cb 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
@@ -206,22 +206,26 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
}});
+ ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString());
+ QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+
ObjectName dlqQueueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME );
QueueViewMBean dlq = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, dlqQueueViewMBeanName, QueueViewMBean.class, true);
- assertTrue("messagees on dlq", Wait.waitFor(new Wait.Condition() {
+ assertTrue("messages on dlq", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
+ LOG.info("Dlq size: " + dlq.getQueueSize() + ", qSize: " + queue.getQueueSize());
return MESSAGE_COUNT == dlq.getQueueSize();
}
}));
dlq.retryMessages();
- assertTrue("messagees on dlq after retry", Wait.waitFor(new Wait.Condition() {
+ assertTrue("messages on dlq after retry", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
- LOG.info("Dlq size: " + dlq.getQueueSize());
+ LOG.info("Dlq size: " + dlq.getQueueSize() + ", qSize: " + queue.getQueueSize());
return MESSAGE_COUNT == dlq.getQueueSize();
}
}));