You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2018/11/20 22:06:48 UTC
qpid-jms git commit: QPIDJMS-434 Safe iteration over consumer that
can close during
Repository: qpid-jms
Updated Branches:
refs/heads/master a9f0ad020 -> 2186300e8
QPIDJMS-434 Safe iteration over consumer that can close during
Ensure that we safely iterate over the consumers when processing a
client acknowledge to ensure that deferred closures don't cause a
ConcurrentModificationException which will error out the acknowledge
operation.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/2186300e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/2186300e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/2186300e
Branch: refs/heads/master
Commit: 2186300e879e1dc235e67d7b197c55b46618dc2b
Parents: a9f0ad0
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Nov 20 17:06:05 2018 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Nov 20 17:06:05 2018 -0500
----------------------------------------------------------------------
.../qpid/jms/provider/amqp/AmqpSession.java | 5 +-
.../integration/ConsumerIntegrationTest.java | 80 +++++++++++++++++++-
2 files changed, 83 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2186300e/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
index 83f12f7..c71a485 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
@@ -71,7 +71,10 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> i
* controls the acknowledgement that is applied to each message.
*/
public void acknowledge(final ACK_TYPE ackType) {
- for (AmqpConsumer consumer : consumers.values()) {
+ // A consumer whose close was deferred will be closed and removed from the consumers
+ // map so we must copy the entries to safely traverse the collection during this operation.
+ List<AmqpConsumer> consumers = new ArrayList<>(this.consumers.values());
+ for (AmqpConsumer consumer : consumers) {
consumer.acknowledge(ackType);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2186300e/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
index fbc0e41..7be79eb 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
@@ -1590,7 +1590,85 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
}
@Test(timeout=20000)
- public void testConsumerWithDeferredCloseActsAsClosed() throws Exception {
+ public void testCloseConsumersWithDeferredAckHandledLaterWhenlastConsumedMessageIsAcked() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ final int DEFAULT_PREFETCH = 10;
+
+ // Set to fixed known value to reduce breakage if defaults are changed.
+ Connection connection = testFixture.establishConnecton(testPeer, "jms.prefetchPolicy.all=" + DEFAULT_PREFETCH);
+ connection.start();
+
+ testPeer.expectBegin();
+
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = session.createQueue(getTestName());
+
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content-for-consumer-1"),
+ 1, false, false, Matchers.equalTo(UnsignedInteger.valueOf(DEFAULT_PREFETCH)), 1, true);
+
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content-for-consumer-2"),
+ 1, false, false, Matchers.equalTo(UnsignedInteger.valueOf(DEFAULT_PREFETCH)), 2, true);
+
+ final CountDownLatch expected = new CountDownLatch(2);
+ ((JmsConnection) connection).addConnectionListener(new JmsDefaultConnectionListener() {
+ @Override
+ public void onInboundMessage(JmsInboundMessageDispatch envelope) {
+ expected.countDown();
+ }
+ });
+
+ // These are our two consumers, the first gets a message and abandons it, the second will have
+ // acknowledge called on its message which will lead to the message for the first to be acknowledged
+ // and then it's link will be closed.
+ MessageConsumer consumer1 = session.createConsumer(queue);
+ MessageConsumer consumer2 = session.createConsumer(queue);
+ Message receivedMessage1 = null;
+ Message receivedMessage2 = null;
+
+ // Ensure all the messages arrived so that the matching below is deterministic
+ assertTrue("Expected transfers didnt occur: " + expected.getCount(), expected.await(5, TimeUnit.SECONDS));
+
+ // Take our two messages from the queue leaving them in a delivered state.
+ receivedMessage1 = consumer1.receive(3000);
+ assertNotNull(receivedMessage1);
+ assertTrue(receivedMessage1 instanceof TextMessage);
+ receivedMessage2 = consumer2.receive(3000);
+ assertNotNull(receivedMessage2);
+ assertTrue(receivedMessage2 instanceof TextMessage);
+
+ // Expect the client to then drain off all credit from the link when "closed"
+ testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(DEFAULT_PREFETCH - 1)));
+ // Expect the client to then drain off all credit from the link when "closed"
+ testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(DEFAULT_PREFETCH - 1)));
+
+ // Close should be deferred as the messages were delivered but not acknowledged.
+ consumer1.close();
+ consumer2.close();
+
+ testPeer.waitForAllHandlersToComplete(3000);
+
+ testPeer.expectDisposition(true, new AcceptedMatcher());
+ testPeer.expectDisposition(true, new AcceptedMatcher());
+
+ // Now the links should close as we tear down the deferred consumers
+ testPeer.expectDetach(true, true, true);
+ testPeer.expectDetach(true, true, true);
+
+ // Acknowledge the last read message, which should accept all previous messages as well
+ // and our consumers should then close their links in turn.
+ receivedMessage2.acknowledge();
+
+ testPeer.expectClose();
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(3000);
+ }
+ }
+
+ @Test(timeout=20000)
+ public void testConsumerWithDeferredCloseAcksAsClosed() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final int DEFAULT_PREFETCH = 100;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org