You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2018/06/07 22:47:57 UTC
qpid-jms git commit: QPIDJMS-389 Signal the correct producer
completion only
Repository: qpid-jms
Updated Branches:
refs/heads/master fe4aa6ed9 -> 816773722
QPIDJMS-389 Signal the correct producer completion only
Ensure that if more than one producer exists on a session that the
remote close of one producer does not trigger completions for the other
producer that is waiting for its send to complete.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/81677372
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/81677372
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/81677372
Branch: refs/heads/master
Commit: 816773722df3abb23d1bca81aa64f353e57c0a02
Parents: fe4aa6e
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Jun 7 18:47:40 2018 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Jun 7 18:47:40 2018 -0400
----------------------------------------------------------------------
.../java/org/apache/qpid/jms/JmsSession.java | 22 ++-
.../integration/ProducerIntegrationTest.java | 176 ++++++++++++++++++-
2 files changed, 187 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/81677372/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 782050e..b8c88f6 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -1414,20 +1414,24 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
while (pending.hasNext()) {
SendCompletion completion = pending.next();
- if (!completion.hasCompleted()) {
- if (producerId == null || producerId.equals(completion.envelope.getProducerId())) {
+ if (producerId == null || producerId.equals(completion.envelope.getProducerId())) {
+ if (!completion.hasCompleted()) {
completion.markAsFailed(failureCause);
}
- }
- try {
- completion.signalCompletion();
- } catch (Throwable error) {
- LOG.trace("Signaled completion of send: {}", completion.envelope);
+ try {
+ completion.signalCompletion();
+ } catch (Throwable error) {
+ } finally {
+ LOG.trace("Signaled completion of send: {}", completion.envelope);
+ }
}
}
- asyncSendQueue.clear();
+ // Only clear on non-discriminating variant to avoid losing track of completions.
+ if (producerId == null) {
+ asyncSendQueue.clear();
+ }
}
}
@@ -1530,6 +1534,8 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
}
public void signalCompletion() {
+ envelope.getMessage().onSendComplete(); // Ensure message is returned as readable.
+
if (failureCause == null) {
listener.onCompletion(envelope.getMessage());
} else {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/81677372/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
index 7cda887..8a8a96c 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -2397,7 +2397,6 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
@Override
public void onCompletion(Message message) {
-
try {
session.close();
} catch (JMSException jmsEx) {
@@ -2761,7 +2760,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
MessageProducer producer = session.createProducer(queue);
int delay = 0;
- if(deliveryDelay) {
+ if (deliveryDelay) {
delay = 123456;
producer.setDeliveryDelay(delay);
}
@@ -2777,7 +2776,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
- if(deliveryDelay) {
+ if (deliveryDelay) {
msgAnnotationsMatcher.withEntry(DELIVERY_TIME, inRange);
}
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
@@ -2803,6 +2802,177 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
}
}
+ @Test(timeout = 20000)
+ public void testRemotelyCloseOneProducerDoesNotCompleteAsyncSendFromAnotherProducer() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+
+ testPeer.expectBegin();
+ testPeer.expectSenderAttach();
+ testPeer.expectSenderAttach();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+
+ MessageProducer producer = session.createProducer(queue);
+
+ // Create a second producer which allows for a safe wait for credit for the
+ // first producer without the need for a sleep. Otherwise the first producer
+ // might not do an actual async send due to not having received credit yet.
+ session.createProducer(queue);
+
+ Message message = session.createTextMessage("content");
+ message.setIntProperty("test", 1);
+
+ testPeer.expectTransferButDoNotRespond(new TransferPayloadCompositeMatcher());
+
+ // This closes link for the second producer we created, not the one that we
+ // will use to send a message.
+ testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true);
+
+ assertNull("Should not yet have a JMSDestination", message.getJMSDestination());
+
+ TestJmsCompletionListener listener = new TestJmsCompletionListener();
+ try {
+ producer.send(message, listener);
+ } catch (JMSException e) {
+ LOG.warn("Caught unexpected error: {}", e.getMessage());
+ fail("No expected exception for this send.");
+ }
+
+ testPeer.waitForAllHandlersToComplete(100);
+
+ assertFalse("Should not get async callback", listener.awaitCompletion(10, TimeUnit.MILLISECONDS));
+
+ // Closing the session should complete the send with an exception
+ testPeer.expectEnd();
+ session.close();
+
+ assertTrue("Did not get async callback", listener.awaitCompletion(5, TimeUnit.SECONDS));
+ assertNotNull(listener.exception);
+ assertNotNull(listener.message);
+ assertTrue(listener.message instanceof TextMessage);
+
+ // Message should be readable
+ assertNotNull("Should have a readable JMSDestination", message.getJMSDestination());
+ assertEquals("Message body not as expected", "content", ((TextMessage) message).getText());
+ assertEquals("Message property not as expected", 1, message.getIntProperty("test"));
+
+ testPeer.expectClose();
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(2000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testRemotelyCloseProducerAndAttemptAsyncCompletionSendThrowsAndLeavesMessageReadable() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+
+ final CountDownLatch producerClosed = new CountDownLatch(1);
+
+ connection.addConnectionListener(new JmsDefaultConnectionListener() {
+
+ @Override
+ public void onProducerClosed(MessageProducer producer, Throwable cause) {
+ producerClosed.countDown();
+ }
+ });
+
+ testPeer.expectBegin();
+ testPeer.expectSenderAttach();
+ testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true);
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+
+ MessageProducer producer = session.createProducer(queue);
+ Message message = session.createTextMessage("content");
+ message.setIntProperty("test", 1);
+
+ assertNull("Should not yet have a JMSDestination", message.getJMSDestination());
+
+ testPeer.waitForAllHandlersToComplete(100);
+
+ assertTrue("Producer should have been closed", producerClosed.await(2, TimeUnit.SECONDS));
+
+ TestJmsCompletionListener listener = new TestJmsCompletionListener();
+ try {
+ producer.send(message, listener);
+ fail("No expected exception for this send.");
+ } catch (JMSException e) {
+ LOG.warn("Caught unexpected error: {}", e.getMessage());
+ }
+
+ assertFalse("Should not get async callback", listener.awaitCompletion(5, TimeUnit.MILLISECONDS));
+
+ // Message should be readable but not carry a destination as it wasn't actually sent anywhere
+ assertNull("Should not have a readable JMSDestination", message.getJMSDestination());
+ assertEquals("Message body not as expected", "content", ((TextMessage) message).getText());
+ assertEquals("Message property not as expected", 1, message.getIntProperty("test"));
+
+ testPeer.expectClose();
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(2000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testRemotelyCloseSessionAndAttemptAsyncCompletionSendThrowsAndLeavesMessageReadable() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+
+ final CountDownLatch sessionClosed = new CountDownLatch(1);
+
+ connection.addConnectionListener(new JmsDefaultConnectionListener() {
+
+ @Override
+ public void onSessionClosed(Session session, Throwable cause) {
+ sessionClosed.countDown();
+ }
+ });
+
+ testPeer.expectBegin();
+ testPeer.expectSenderAttach();
+ testPeer.remotelyEndLastOpenedSession(true);
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+
+ MessageProducer producer = session.createProducer(queue);
+ Message message = session.createTextMessage("content");
+ message.setIntProperty("test", 1);
+
+ assertNull("Should not yet have a JMSDestination", message.getJMSDestination());
+
+ testPeer.waitForAllHandlersToComplete(100);
+
+ assertTrue("Session should have been closed", sessionClosed.await(2, TimeUnit.SECONDS));
+
+ TestJmsCompletionListener listener = new TestJmsCompletionListener();
+ try {
+ producer.send(message, listener);
+ fail("No expected exception for this send.");
+ } catch (JMSException e) {
+ LOG.warn("Caught unexpected error: {}", e.getMessage());
+ }
+
+ assertFalse("Should not get async callback", listener.awaitCompletion(5, TimeUnit.MILLISECONDS));
+
+ // Message should be readable but not carry a destination as it wasn't actually sent anywhere
+ assertNull("Should not have a readable JMSDestination", message.getJMSDestination());
+ assertEquals("Message body not as expected", "content", ((TextMessage) message).getText());
+ assertEquals("Message property not as expected", 1, message.getIntProperty("test"));
+
+ testPeer.expectClose();
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(2000);
+ }
+ }
+
private class TestJmsCompletionListener implements CompletionListener {
private final CountDownLatch completed;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org