You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2018/11/20 22:06:48 UTC

qpid-jms git commit: QPIDJMS-434 Safe iteration over consumer that can close during

Repository: qpid-jms
Updated Branches:
  refs/heads/master a9f0ad020 -> 2186300e8


QPIDJMS-434 Safe iteration over consumer that can close during

Ensure that we safely iterate over the consumers when processing a
client acknowledge to ensure that deferred closures don't cause a
ConcurrentModificationException which will error out the acknowledge
operation.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/2186300e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/2186300e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/2186300e

Branch: refs/heads/master
Commit: 2186300e879e1dc235e67d7b197c55b46618dc2b
Parents: a9f0ad0
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Nov 20 17:06:05 2018 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Nov 20 17:06:05 2018 -0500

----------------------------------------------------------------------
 .../qpid/jms/provider/amqp/AmqpSession.java     |  5 +-
 .../integration/ConsumerIntegrationTest.java    | 80 +++++++++++++++++++-
 2 files changed, 83 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2186300e/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
index 83f12f7..c71a485 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
@@ -71,7 +71,10 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> i
      *      controls the acknowledgement that is applied to each message.
      */
     public void acknowledge(final ACK_TYPE ackType) {
-        for (AmqpConsumer consumer : consumers.values()) {
+        // A consumer whose close was deferred will be closed and removed from the consumers
+        // map so we must copy the entries to safely traverse the collection during this operation.
+        List<AmqpConsumer> consumers = new ArrayList<>(this.consumers.values());
+        for (AmqpConsumer consumer : consumers) {
             consumer.acknowledge(ackType);
         }
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2186300e/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
index fbc0e41..7be79eb 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
@@ -1590,7 +1590,85 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
     }
 
     @Test(timeout=20000)
-    public void testConsumerWithDeferredCloseActsAsClosed() throws Exception {
+    public void testCloseConsumersWithDeferredAckHandledLaterWhenlastConsumedMessageIsAcked() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            final int DEFAULT_PREFETCH = 10;
+
+            // Set to fixed known value to reduce breakage if defaults are changed.
+            Connection connection = testFixture.establishConnecton(testPeer, "jms.prefetchPolicy.all=" + DEFAULT_PREFETCH);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+            Queue queue = session.createQueue(getTestName());
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content-for-consumer-1"),
+                    1, false, false, Matchers.equalTo(UnsignedInteger.valueOf(DEFAULT_PREFETCH)), 1, true);
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content-for-consumer-2"),
+                    1, false, false, Matchers.equalTo(UnsignedInteger.valueOf(DEFAULT_PREFETCH)), 2, true);
+
+            final CountDownLatch expected = new CountDownLatch(2);
+            ((JmsConnection) connection).addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onInboundMessage(JmsInboundMessageDispatch envelope) {
+                    expected.countDown();
+                }
+            });
+
+            // These are our two consumers, the first gets a message and abandons it, the second will have
+            // acknowledge called on its message which will lead to the message for the first to be acknowledged
+            // and then it's link will be closed.
+            MessageConsumer consumer1 = session.createConsumer(queue);
+            MessageConsumer consumer2 = session.createConsumer(queue);
+            Message receivedMessage1 = null;
+            Message receivedMessage2 = null;
+
+            // Ensure all the messages arrived so that the matching below is deterministic
+            assertTrue("Expected transfers didnt occur: " + expected.getCount(), expected.await(5, TimeUnit.SECONDS));
+
+            // Take our two messages from the queue leaving them in a delivered state.
+            receivedMessage1 = consumer1.receive(3000);
+            assertNotNull(receivedMessage1);
+            assertTrue(receivedMessage1 instanceof TextMessage);
+            receivedMessage2 = consumer2.receive(3000);
+            assertNotNull(receivedMessage2);
+            assertTrue(receivedMessage2 instanceof TextMessage);
+
+            // Expect the client to then drain off all credit from the link when "closed"
+            testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(DEFAULT_PREFETCH - 1)));
+            // Expect the client to then drain off all credit from the link when "closed"
+            testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(DEFAULT_PREFETCH - 1)));
+
+            // Close should be deferred as the messages were delivered but not acknowledged.
+            consumer1.close();
+            consumer2.close();
+
+            testPeer.waitForAllHandlersToComplete(3000);
+
+            testPeer.expectDisposition(true, new AcceptedMatcher());
+            testPeer.expectDisposition(true, new AcceptedMatcher());
+
+            // Now the links should close as we tear down the deferred consumers
+            testPeer.expectDetach(true, true, true);
+            testPeer.expectDetach(true, true, true);
+
+            // Acknowledge the last read message, which should accept all previous messages as well
+            // and our consumers should then close their links in turn.
+            receivedMessage2.acknowledge();
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(3000);
+        }
+    }
+
+    @Test(timeout=20000)
+    public void testConsumerWithDeferredCloseAcksAsClosed() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
             final int DEFAULT_PREFETCH = 100;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org