You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2018/06/07 22:47:57 UTC

qpid-jms git commit: QPIDJMS-389 Signal the correct producer completion only

Repository: qpid-jms
Updated Branches:
  refs/heads/master fe4aa6ed9 -> 816773722


QPIDJMS-389 Signal the correct producer completion only

Ensure that if more than one producer exists on a session that the
remote close of one producer does not trigger completions for the other
producer that is waiting for its send to complete.


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/81677372
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/81677372
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/81677372

Branch: refs/heads/master
Commit: 816773722df3abb23d1bca81aa64f353e57c0a02
Parents: fe4aa6e
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Jun 7 18:47:40 2018 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Jun 7 18:47:40 2018 -0400

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/JmsSession.java    |  22 ++-
 .../integration/ProducerIntegrationTest.java    | 176 ++++++++++++++++++-
 2 files changed, 187 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/81677372/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 782050e..b8c88f6 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -1414,20 +1414,24 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
             while (pending.hasNext()) {
                 SendCompletion completion = pending.next();
 
-                if (!completion.hasCompleted()) {
-                    if (producerId == null || producerId.equals(completion.envelope.getProducerId())) {
+                if (producerId == null || producerId.equals(completion.envelope.getProducerId())) {
+                    if (!completion.hasCompleted()) {
                         completion.markAsFailed(failureCause);
                     }
-                }
 
-                try {
-                    completion.signalCompletion();
-                } catch (Throwable error) {
-                    LOG.trace("Signaled completion of send: {}", completion.envelope);
+                    try {
+                        completion.signalCompletion();
+                    } catch (Throwable error) {
+                    } finally {
+                        LOG.trace("Signaled completion of send: {}", completion.envelope);
+                    }
                 }
             }
 
-            asyncSendQueue.clear();
+            // Only clear on non-discriminating variant to avoid losing track of completions.
+            if (producerId == null) {
+                asyncSendQueue.clear();
+            }
         }
     }
 
@@ -1530,6 +1534,8 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
         }
 
         public void signalCompletion() {
+            envelope.getMessage().onSendComplete();  // Ensure message is returned as readable.
+
             if (failureCause == null) {
                 listener.onCompletion(envelope.getMessage());
             } else {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/81677372/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
index 7cda887..8a8a96c 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -2397,7 +2397,6 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
 
                 @Override
                 public void onCompletion(Message message) {
-
                     try {
                         session.close();
                     } catch (JMSException jmsEx) {
@@ -2761,7 +2760,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
             MessageProducer producer = session.createProducer(queue);
 
             int delay = 0;
-            if(deliveryDelay) {
+            if (deliveryDelay) {
                 delay = 123456;
                 producer.setDeliveryDelay(delay);
             }
@@ -2777,7 +2776,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
             TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
             messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
             MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
-            if(deliveryDelay) {
+            if (deliveryDelay) {
                 msgAnnotationsMatcher.withEntry(DELIVERY_TIME, inRange);
             }
             messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
@@ -2803,6 +2802,177 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
         }
     }
 
+    @Test(timeout = 20000)
+    public void testRemotelyCloseOneProducerDoesNotCompleteAsyncSendFromAnotherProducer() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+            testPeer.expectSenderAttach();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            MessageProducer producer = session.createProducer(queue);
+
+            // Create a second producer which allows for a safe wait for credit for the
+            // first producer without the need for a sleep.  Otherwise the first producer
+            // might not do an actual async send due to not having received credit yet.
+            session.createProducer(queue);
+
+            Message message = session.createTextMessage("content");
+            message.setIntProperty("test", 1);
+
+            testPeer.expectTransferButDoNotRespond(new TransferPayloadCompositeMatcher());
+
+            // This closes link for the second producer we created, not the one that we
+            // will use to send a message.
+            testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true);
+
+            assertNull("Should not yet have a JMSDestination", message.getJMSDestination());
+
+            TestJmsCompletionListener listener = new TestJmsCompletionListener();
+            try {
+                producer.send(message, listener);
+            } catch (JMSException e) {
+                LOG.warn("Caught unexpected error: {}", e.getMessage());
+                fail("No expected exception for this send.");
+            }
+
+            testPeer.waitForAllHandlersToComplete(100);
+
+            assertFalse("Should not get async callback", listener.awaitCompletion(10, TimeUnit.MILLISECONDS));
+
+            // Closing the session should complete the send with an exception
+            testPeer.expectEnd();
+            session.close();
+
+            assertTrue("Did not get async callback", listener.awaitCompletion(5, TimeUnit.SECONDS));
+            assertNotNull(listener.exception);
+            assertNotNull(listener.message);
+            assertTrue(listener.message instanceof TextMessage);
+
+            // Message should be readable
+            assertNotNull("Should have a readable JMSDestination", message.getJMSDestination());
+            assertEquals("Message body not as expected", "content", ((TextMessage) message).getText());
+            assertEquals("Message property not as expected", 1, message.getIntProperty("test"));
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testRemotelyCloseProducerAndAttemptAsyncCompletionSendThrowsAndLeavesMessageReadable() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+
+            final CountDownLatch producerClosed = new CountDownLatch(1);
+
+            connection.addConnectionListener(new JmsDefaultConnectionListener() {
+
+                @Override
+                public void onProducerClosed(MessageProducer producer, Throwable cause) {
+                    producerClosed.countDown();
+                }
+            });
+
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+            testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true);
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            MessageProducer producer = session.createProducer(queue);
+            Message message = session.createTextMessage("content");
+            message.setIntProperty("test", 1);
+
+            assertNull("Should not yet have a JMSDestination", message.getJMSDestination());
+
+            testPeer.waitForAllHandlersToComplete(100);
+
+            assertTrue("Producer should have been closed", producerClosed.await(2, TimeUnit.SECONDS));
+
+            TestJmsCompletionListener listener = new TestJmsCompletionListener();
+            try {
+                producer.send(message, listener);
+                fail("No expected exception for this send.");
+            } catch (JMSException e) {
+                LOG.warn("Caught unexpected error: {}", e.getMessage());
+            }
+
+            assertFalse("Should not get async callback", listener.awaitCompletion(5, TimeUnit.MILLISECONDS));
+
+            // Message should be readable but not carry a destination as it wasn't actually sent anywhere
+            assertNull("Should not have a readable JMSDestination", message.getJMSDestination());
+            assertEquals("Message body not as expected", "content", ((TextMessage) message).getText());
+            assertEquals("Message property not as expected", 1, message.getIntProperty("test"));
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testRemotelyCloseSessionAndAttemptAsyncCompletionSendThrowsAndLeavesMessageReadable() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+
+            final CountDownLatch sessionClosed = new CountDownLatch(1);
+
+            connection.addConnectionListener(new JmsDefaultConnectionListener() {
+
+                @Override
+                public void onSessionClosed(Session session, Throwable cause) {
+                    sessionClosed.countDown();
+                }
+            });
+
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+            testPeer.remotelyEndLastOpenedSession(true);
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            MessageProducer producer = session.createProducer(queue);
+            Message message = session.createTextMessage("content");
+            message.setIntProperty("test", 1);
+
+            assertNull("Should not yet have a JMSDestination", message.getJMSDestination());
+
+            testPeer.waitForAllHandlersToComplete(100);
+
+            assertTrue("Session should have been closed", sessionClosed.await(2, TimeUnit.SECONDS));
+
+            TestJmsCompletionListener listener = new TestJmsCompletionListener();
+            try {
+                producer.send(message, listener);
+                fail("No expected exception for this send.");
+            } catch (JMSException e) {
+                LOG.warn("Caught unexpected error: {}", e.getMessage());
+            }
+
+            assertFalse("Should not get async callback", listener.awaitCompletion(5, TimeUnit.MILLISECONDS));
+
+            // Message should be readable but not carry a destination as it wasn't actually sent anywhere
+            assertNull("Should not have a readable JMSDestination", message.getJMSDestination());
+            assertEquals("Message body not as expected", "content", ((TextMessage) message).getText());
+            assertEquals("Message property not as expected", 1, message.getIntProperty("test"));
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
+
     private class TestJmsCompletionListener implements CompletionListener {
 
         private final CountDownLatch completed;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org