You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2020/07/15 16:45:57 UTC

[qpid-jms] branch master updated: QPIDJMS-509: send disposition when closing consumer inside onmessage after recover, or when recovering after closing consumer with unacked messages

This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-jms.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b7bc81  QPIDJMS-509: send disposition when closing consumer inside onmessage after recover, or when recovering after closing consumer with unacked messages
9b7bc81 is described below

commit 9b7bc819f68d4d41bed9f2a27887decfdc675ff7
Author: Robbie Gemmell <ro...@apache.org>
AuthorDate: Wed Jul 15 17:38:25 2020 +0100

    QPIDJMS-509: send disposition when closing consumer inside onmessage after recover, or when recovering after closing consumer with unacked messages
---
 .../qpid/jms/provider/amqp/AmqpConsumer.java       | 16 +++--
 .../jms/integration/ConsumerIntegrationTest.java   | 69 +++++++++++++++++++++-
 .../jms/integration/SessionIntegrationTest.java    | 44 ++++++++++++++
 3 files changed, 122 insertions(+), 7 deletions(-)

diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 202a9e7..c5990e5 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -55,6 +55,8 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
 
     private static final Logger LOG = LoggerFactory.getLogger(AmqpConsumer.class);
 
+    private static final int INDIVIDUAL_ACKNOWLEDGE = 101;
+
     protected final AmqpSession session;
     protected final int acknowledgementMode;
     protected AsyncResult stopRequest;
@@ -86,7 +88,10 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
     }
 
     private void acknowledgeUndeliveredRecoveredMessages() {
-        if(acknowledgementMode == Session.CLIENT_ACKNOWLEDGE) {
+        if(acknowledgementMode == Session.CLIENT_ACKNOWLEDGE
+                || acknowledgementMode == Session.AUTO_ACKNOWLEDGE
+                    || acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE
+                        || acknowledgementMode == INDIVIDUAL_ACKNOWLEDGE) {
             // Send dispositions for any messages which were previously delivered and
             // session recovered, but were then not delivered again afterwards.
             Delivery delivery = getEndpoint().head();
@@ -440,9 +445,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
                     envelope.getMessage().getFacade().getRedeliveryCount() + 1);
                 envelope.setEnqueueFirst(true);
                 envelope.setDelivered(false);
-                if(acknowledgementMode == Session.CLIENT_ACKNOWLEDGE) {
-                    envelope.setRecovered(true);
-                }
+                envelope.setRecovered(true);
 
                 redispatchList.add(envelope);
             }
@@ -458,6 +461,11 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
         while (reverseIterator.hasPrevious()) {
             deliver(reverseIterator.previous());
         }
+
+        if(deferredClose) {
+            acknowledgeUndeliveredRecoveredMessages();
+            tryCompleteDeferredClose();
+        }
     }
 
     /**
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
index 9fcec81..5d1572b 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
@@ -87,6 +87,8 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
 
     private static final Logger LOG = LoggerFactory.getLogger(ConsumerIntegrationTest.class);
 
+    private static final int INDIVIDUAL_ACKNOWLEDGE = 101;
+
     private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
 
     @Test(timeout = 20000)
@@ -1172,6 +1174,52 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
 
     @Test(timeout=20000)
     public void testMessageListenerClosesItsConsumer() throws Exception {
+        doMessageListenerClosesItsConsumerTestImpl(false, false, Session.AUTO_ACKNOWLEDGE);
+    }
+
+    @Test(timeout=20000)
+    public void testMessageListenerClosesItsConsumerAfterRecoverAutoAck() throws Exception {
+        doMessageListenerClosesItsConsumerTestImpl(true, false, Session.AUTO_ACKNOWLEDGE);
+    }
+
+    @Test(timeout=20000)
+    public void testMessageListenerClosesItsConsumerAfterRecoverClientAck() throws Exception {
+        doMessageListenerClosesItsConsumerTestImpl(true, false, Session.CLIENT_ACKNOWLEDGE);
+    }
+
+    @Test(timeout=20000)
+    public void testMessageListenerClosesItsConsumerAfterRecoverDupsOk() throws Exception {
+        doMessageListenerClosesItsConsumerTestImpl(true, false, Session.DUPS_OK_ACKNOWLEDGE);
+    }
+
+    @Test(timeout=20000)
+    public void testMessageListenerClosesItsConsumerAfterRecoverIndividualAck() throws Exception {
+        doMessageListenerClosesItsConsumerTestImpl(true, false, INDIVIDUAL_ACKNOWLEDGE);
+    }
+
+    @Test(timeout=20000)
+    public void testMessageListenerClosesItsConsumerBeforeRecoverAutoAck() throws Exception {
+        doMessageListenerClosesItsConsumerTestImpl(false, true, Session.AUTO_ACKNOWLEDGE);
+    }
+
+    @Test(timeout=20000)
+    public void testMessageListenerClosesItsConsumerBeforeRecoverClientAck() throws Exception {
+        doMessageListenerClosesItsConsumerTestImpl(false, true, Session.CLIENT_ACKNOWLEDGE);
+    }
+
+    @Test(timeout=20000)
+    public void testMessageListenerClosesItsConsumerBeforeRecoverDupsOk() throws Exception {
+        doMessageListenerClosesItsConsumerTestImpl(false, true, Session.DUPS_OK_ACKNOWLEDGE);
+    }
+
+    @Test(timeout=20000)
+    public void testMessageListenerClosesItsConsumerBeforeRecoverIndividualAck() throws Exception {
+        doMessageListenerClosesItsConsumerTestImpl(false, true, INDIVIDUAL_ACKNOWLEDGE);
+    }
+
+    private void doMessageListenerClosesItsConsumerTestImpl(boolean recoverAfterClose, boolean recoverBeforeClose, int ackMode) throws Exception {
+        assertFalse("Cant recover a transacted session", ackMode == Session.SESSION_TRANSACTED);
+
         final CountDownLatch latch = new CountDownLatch(1);
         final CountDownLatch exceptionListenerFired = new CountDownLatch(1);
         final AtomicReference<Throwable> error = new AtomicReference<>();
@@ -1190,7 +1238,7 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
 
             testPeer.expectBegin();
 
-            final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            final Session session = connection.createSession(false, ackMode);
             Queue destination = session.createQueue(getTestName());
             connection.start();
 
@@ -1199,15 +1247,30 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
 
             MessageConsumer consumer = session.createConsumer(destination);
 
-            testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH -1)));
-            testPeer.expectDisposition(true, new AcceptedMatcher());
+            if(recoverBeforeClose) {
+                testPeer.expectDisposition(true, new ModifiedMatcher().withDeliveryFailed(equalTo(true)));
+            } else if(recoverAfterClose) {
+                testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH -1)));
+                testPeer.expectDisposition(true, new ModifiedMatcher().withDeliveryFailed(equalTo(true)));
+            } else {
+                testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH -1)));
+                testPeer.expectDisposition(true, new AcceptedMatcher());
+            }
             testPeer.expectDetach(true, true, true);
 
             consumer.setMessageListener(new MessageListener() {
                 @Override
                 public void onMessage(Message m) {
                     try {
+                        if(recoverBeforeClose) {
+                            session.recover();
+                        }
+
                         consumer.close();
+
+                        if(recoverAfterClose) {
+                            session.recover();
+                        }
                     } catch (Throwable t) {
                         error.set(t);
                         LOG.error("Unexpected error during close", t);
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index 0e19c07..4222a9a 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -2266,6 +2266,50 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
     }
 
     @Test(timeout = 20000)
+    public void testCloseConsumerWithUnackedClientAckMessagesThenRecoverSession() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(Session.CLIENT_ACKNOWLEDGE);
+
+            int msgCount = 2;
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), msgCount, false, false,
+                    Matchers.greaterThanOrEqualTo(UnsignedInteger.valueOf(msgCount)), 1, false, true);
+
+            Queue destination = session.createQueue(getTestName());
+            MessageConsumer consumer = session.createConsumer(destination);
+
+            TextMessage receivedTextMessage = null;
+            assertNotNull("Expected a message", receivedTextMessage = (TextMessage) consumer.receive(3000));
+            assertEquals("Unexpected delivery number", 1,  receivedTextMessage.getIntProperty(TestAmqpPeer.MESSAGE_NUMBER) + 1);
+            assertNotNull("Expected a message", receivedTextMessage = (TextMessage) consumer.receive(3000));
+            assertEquals("Unexpected delivery number", 2,  receivedTextMessage.getIntProperty(TestAmqpPeer.MESSAGE_NUMBER) + 1);
+
+            testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH - msgCount)));
+
+            consumer.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+
+            testPeer.expectDisposition(true, new ModifiedMatcher().withDeliveryFailed(equalTo(true)), 1, 1);
+            testPeer.expectDisposition(true, new ModifiedMatcher().withDeliveryFailed(equalTo(true)), 2, 2);
+            testPeer.expectDetach(true, true, true);
+
+            session.recover();
+
+            // Verify the expectations happen in response to the recover() and not the following close().
+            testPeer.waitForAllHandlersToComplete(2000);
+
+            testPeer.expectClose();
+            connection.close();
+        }
+    }
+
+    @Test(timeout = 20000)
     public void testRecoveredClientAckSessionWithDurableSubscriber() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
             Connection connection = testFixture.establishConnecton(testPeer, false, "?jms.clientID=myClientId", null, null, false);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org