You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2017/10/27 10:40:22 UTC
[2/2] activemq git commit: [AMQ-6847] pause dispatch for message move
to avoid redelivery with pending ack/remove/audit rollback
[AMQ-6847] pause dispatch for message move to avoid redelivery with pending ack/remove/audit rollback
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/2ea5d142
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/2ea5d142
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/2ea5d142
Branch: refs/heads/master
Commit: 2ea5d1420bbbf90bd151e19a75e6ca33c773f1f4
Parents: 005403e
Author: gtully <ga...@gmail.com>
Authored: Fri Oct 27 11:40:06 2017 +0100
Committer: gtully <ga...@gmail.com>
Committed: Fri Oct 27 11:40:06 2017 +0100
----------------------------------------------------------------------
.../apache/activemq/broker/region/Queue.java | 31 +++++++++----
.../apache/activemq/broker/jmx/MBeanTest.java | 47 ++++++++++++++++++++
2 files changed, 70 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/2ea5d142/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index f440f76..4a2d272 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1471,18 +1471,33 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
* @throws Exception
*/
public boolean moveMessageTo(ConnectionContext context, QueueMessageReference m, ActiveMQDestination dest) throws Exception {
- BrokerSupport.resend(context, m.getMessage(), dest);
- removeMessage(context, m);
- messagesLock.writeLock().lock();
+ Set<Destination> destsToPause = regionBroker.getDestinations(dest);
try {
- messages.rollback(m.getMessageId());
- if (isDLQ()) {
- DeadLetterStrategy stratagy = getDeadLetterStrategy();
- stratagy.rollback(m.getMessage());
+ for (Destination d: destsToPause) {
+ if (d instanceof Queue) {
+ ((Queue)d).pauseDispatch();
+ }
+ }
+ BrokerSupport.resend(context, m.getMessage(), dest);
+ removeMessage(context, m);
+ messagesLock.writeLock().lock();
+ try {
+ messages.rollback(m.getMessageId());
+ if (isDLQ()) {
+ DeadLetterStrategy stratagy = getDeadLetterStrategy();
+ stratagy.rollback(m.getMessage());
+ }
+ } finally {
+ messagesLock.writeLock().unlock();
}
} finally {
- messagesLock.writeLock().unlock();
+ for (Destination d: destsToPause) {
+ if (d instanceof Queue) {
+ ((Queue)d).resumeDispatch();
+ }
+ }
}
+
return true;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/2ea5d142/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
index d72d709..ecc6894 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
@@ -54,6 +54,7 @@ import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;
import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.broker.region.policy.PolicyEntry;
@@ -180,6 +181,52 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
assertEquals("no forwards", 0, queueNew.getForwardCount());
}
+ public void testMoveFromDLQImmediateDLQ() throws Exception {
+
+ RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+ redeliveryPolicy.setMaximumRedeliveries(0);
+ ((ActiveMQConnectionFactory)connectionFactory).setRedeliveryPolicy(redeliveryPolicy);
+ Connection connection = connectionFactory.createConnection();
+
+ // populate
+ useConnection(connection);
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination dest = session.createQueue(getDestinationString());
+ MessageConsumer consumer = session.createConsumer(dest);
+ consumer.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(Message message) {
+ try {
+ System.out.println("Received: " + message + " on " + message.getJMSDestination());
+ } catch (JMSException e) {
+ e.printStackTrace();
+ }
+ throw new RuntimeException("Horrible exception");
+ }});
+
+
+ ObjectName dlqQueueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME );
+ QueueViewMBean dlq = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, dlqQueueViewMBeanName, QueueViewMBean.class, true);
+
+ assertTrue("messagees on dlq", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return MESSAGE_COUNT == dlq.getQueueSize();
+ }
+ }));
+
+ dlq.retryMessages();
+
+ assertTrue("messagees on dlq after retry", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ LOG.info("Dlq size: " + dlq.getQueueSize());
+ return MESSAGE_COUNT == dlq.getQueueSize();
+ }
+ }));
+ }
+
//Show broken behaviour https://issues.apache.org/jira/browse/AMQ-5752"
// points to the need to except on a duplicate or have store.addMessage return boolean
// need some thought on how best to resolve this