You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2020/07/15 16:45:57 UTC
[qpid-jms] branch master updated: QPIDJMS-509: send disposition
when closing consumer inside onmessage after recover,
or when recovering after closing consumer with unacked messages
This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-jms.git
The following commit(s) were added to refs/heads/master by this push:
new 9b7bc81 QPIDJMS-509: send disposition when closing consumer inside onmessage after recover, or when recovering after closing consumer with unacked messages
9b7bc81 is described below
commit 9b7bc819f68d4d41bed9f2a27887decfdc675ff7
Author: Robbie Gemmell <ro...@apache.org>
AuthorDate: Wed Jul 15 17:38:25 2020 +0100
QPIDJMS-509: send disposition when closing consumer inside onmessage after recover, or when recovering after closing consumer with unacked messages
---
.../qpid/jms/provider/amqp/AmqpConsumer.java | 16 +++--
.../jms/integration/ConsumerIntegrationTest.java | 69 +++++++++++++++++++++-
.../jms/integration/SessionIntegrationTest.java | 44 ++++++++++++++
3 files changed, 122 insertions(+), 7 deletions(-)
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 202a9e7..c5990e5 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -55,6 +55,8 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
private static final Logger LOG = LoggerFactory.getLogger(AmqpConsumer.class);
+ private static final int INDIVIDUAL_ACKNOWLEDGE = 101;
+
protected final AmqpSession session;
protected final int acknowledgementMode;
protected AsyncResult stopRequest;
@@ -86,7 +88,10 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
}
private void acknowledgeUndeliveredRecoveredMessages() {
- if(acknowledgementMode == Session.CLIENT_ACKNOWLEDGE) {
+ if(acknowledgementMode == Session.CLIENT_ACKNOWLEDGE
+ || acknowledgementMode == Session.AUTO_ACKNOWLEDGE
+ || acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE
+ || acknowledgementMode == INDIVIDUAL_ACKNOWLEDGE) {
// Send dispositions for any messages which were previously delivered and
// session recovered, but were then not delivered again afterwards.
Delivery delivery = getEndpoint().head();
@@ -440,9 +445,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
envelope.getMessage().getFacade().getRedeliveryCount() + 1);
envelope.setEnqueueFirst(true);
envelope.setDelivered(false);
- if(acknowledgementMode == Session.CLIENT_ACKNOWLEDGE) {
- envelope.setRecovered(true);
- }
+ envelope.setRecovered(true);
redispatchList.add(envelope);
}
@@ -458,6 +461,11 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
while (reverseIterator.hasPrevious()) {
deliver(reverseIterator.previous());
}
+
+ if(deferredClose) {
+ acknowledgeUndeliveredRecoveredMessages();
+ tryCompleteDeferredClose();
+ }
}
/**
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
index 9fcec81..5d1572b 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
@@ -87,6 +87,8 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
private static final Logger LOG = LoggerFactory.getLogger(ConsumerIntegrationTest.class);
+ private static final int INDIVIDUAL_ACKNOWLEDGE = 101;
+
private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
@Test(timeout = 20000)
@@ -1172,6 +1174,52 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
@Test(timeout=20000)
public void testMessageListenerClosesItsConsumer() throws Exception {
+ doMessageListenerClosesItsConsumerTestImpl(false, false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ @Test(timeout=20000)
+ public void testMessageListenerClosesItsConsumerAfterRecoverAutoAck() throws Exception {
+ doMessageListenerClosesItsConsumerTestImpl(true, false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ @Test(timeout=20000)
+ public void testMessageListenerClosesItsConsumerAfterRecoverClientAck() throws Exception {
+ doMessageListenerClosesItsConsumerTestImpl(true, false, Session.CLIENT_ACKNOWLEDGE);
+ }
+
+ @Test(timeout=20000)
+ public void testMessageListenerClosesItsConsumerAfterRecoverDupsOk() throws Exception {
+ doMessageListenerClosesItsConsumerTestImpl(true, false, Session.DUPS_OK_ACKNOWLEDGE);
+ }
+
+ @Test(timeout=20000)
+ public void testMessageListenerClosesItsConsumerAfterRecoverIndividualAck() throws Exception {
+ doMessageListenerClosesItsConsumerTestImpl(true, false, INDIVIDUAL_ACKNOWLEDGE);
+ }
+
+ @Test(timeout=20000)
+ public void testMessageListenerClosesItsConsumerBeforeRecoverAutoAck() throws Exception {
+ doMessageListenerClosesItsConsumerTestImpl(false, true, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ @Test(timeout=20000)
+ public void testMessageListenerClosesItsConsumerBeforeRecoverClientAck() throws Exception {
+ doMessageListenerClosesItsConsumerTestImpl(false, true, Session.CLIENT_ACKNOWLEDGE);
+ }
+
+ @Test(timeout=20000)
+ public void testMessageListenerClosesItsConsumerBeforeRecoverDupsOk() throws Exception {
+ doMessageListenerClosesItsConsumerTestImpl(false, true, Session.DUPS_OK_ACKNOWLEDGE);
+ }
+
+ @Test(timeout=20000)
+ public void testMessageListenerClosesItsConsumerBeforeRecoverIndividualAck() throws Exception {
+ doMessageListenerClosesItsConsumerTestImpl(false, true, INDIVIDUAL_ACKNOWLEDGE);
+ }
+
+ private void doMessageListenerClosesItsConsumerTestImpl(boolean recoverAfterClose, boolean recoverBeforeClose, int ackMode) throws Exception {
+ assertFalse("Cant recover a transacted session", ackMode == Session.SESSION_TRANSACTED);
+
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch exceptionListenerFired = new CountDownLatch(1);
final AtomicReference<Throwable> error = new AtomicReference<>();
@@ -1190,7 +1238,7 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
testPeer.expectBegin();
- final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Session session = connection.createSession(false, ackMode);
Queue destination = session.createQueue(getTestName());
connection.start();
@@ -1199,15 +1247,30 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
MessageConsumer consumer = session.createConsumer(destination);
- testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH -1)));
- testPeer.expectDisposition(true, new AcceptedMatcher());
+ if(recoverBeforeClose) {
+ testPeer.expectDisposition(true, new ModifiedMatcher().withDeliveryFailed(equalTo(true)));
+ } else if(recoverAfterClose) {
+ testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH -1)));
+ testPeer.expectDisposition(true, new ModifiedMatcher().withDeliveryFailed(equalTo(true)));
+ } else {
+ testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH -1)));
+ testPeer.expectDisposition(true, new AcceptedMatcher());
+ }
testPeer.expectDetach(true, true, true);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message m) {
try {
+ if(recoverBeforeClose) {
+ session.recover();
+ }
+
consumer.close();
+
+ if(recoverAfterClose) {
+ session.recover();
+ }
} catch (Throwable t) {
error.set(t);
LOG.error("Unexpected error during close", t);
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index 0e19c07..4222a9a 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -2266,6 +2266,50 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
}
@Test(timeout = 20000)
+ public void testCloseConsumerWithUnackedClientAckMessagesThenRecoverSession() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ testPeer.expectBegin();
+
+ Session session = connection.createSession(Session.CLIENT_ACKNOWLEDGE);
+
+ int msgCount = 2;
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), msgCount, false, false,
+ Matchers.greaterThanOrEqualTo(UnsignedInteger.valueOf(msgCount)), 1, false, true);
+
+ Queue destination = session.createQueue(getTestName());
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ TextMessage receivedTextMessage = null;
+ assertNotNull("Expected a message", receivedTextMessage = (TextMessage) consumer.receive(3000));
+ assertEquals("Unexpected delivery number", 1, receivedTextMessage.getIntProperty(TestAmqpPeer.MESSAGE_NUMBER) + 1);
+ assertNotNull("Expected a message", receivedTextMessage = (TextMessage) consumer.receive(3000));
+ assertEquals("Unexpected delivery number", 2, receivedTextMessage.getIntProperty(TestAmqpPeer.MESSAGE_NUMBER) + 1);
+
+ testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH - msgCount)));
+
+ consumer.close();
+
+ testPeer.waitForAllHandlersToComplete(2000);
+
+ testPeer.expectDisposition(true, new ModifiedMatcher().withDeliveryFailed(equalTo(true)), 1, 1);
+ testPeer.expectDisposition(true, new ModifiedMatcher().withDeliveryFailed(equalTo(true)), 2, 2);
+ testPeer.expectDetach(true, true, true);
+
+ session.recover();
+
+ // Verify the expectations happen in response to the recover() and not the following close().
+ testPeer.waitForAllHandlersToComplete(2000);
+
+ testPeer.expectClose();
+ connection.close();
+ }
+ }
+
+ @Test(timeout = 20000)
public void testRecoveredClientAckSessionWithDurableSubscriber() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer, false, "?jms.clientID=myClientId", null, null, false);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org