You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2020/08/12 15:34:57 UTC

[qpid-jms] branch master updated: QPIDJMS-515 Do not recycle a delivery tag from a failed send operation

This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-jms.git


The following commit(s) were added to refs/heads/master by this push:
     new 04a6cc6  QPIDJMS-515 Do not recycle a delivery tag from a failed send operation
04a6cc6 is described below

commit 04a6cc650bc4e4fbdef8521f3c32b7bbf3f46a82
Author: Timothy Bish <ta...@gmail.com>
AuthorDate: Wed Aug 12 11:30:19 2020 -0400

    QPIDJMS-515 Do not recycle a delivery tag from a failed send operation
    
    When send fails that has used a delivery tag do not recycle the tag as
    the remote state is unknown and the next send could reuse that tag and
    create an error as tags are meant to be unique amongst the unsettled
    deliveries on the link.
---
 .../qpid/jms/provider/amqp/AmqpFixedProducer.java  | 14 ++++----
 .../jms/integration/ProducerIntegrationTest.java   | 38 ++++++++++++++++++++++
 .../qpid/jms/test/testpeer/TestAmqpPeer.java       | 25 ++++++++++++--
 3 files changed, 67 insertions(+), 10 deletions(-)

diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index f7e9c2a..ad993bd 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -400,19 +400,17 @@ public class AmqpFixedProducer extends AmqpProducer {
         private void handleSendCompletion(boolean successful) {
             setRequestTimeout(null);
 
+            // Null delivery means that we never had credit to send so no delivery was created to carry the message.
             if (getDelivery() != null) {
                 sent.remove(envelope.getMessageId());
                 delivery.settle();
-                tagGenerator.returnTag(delivery.getTag());
-            } else {
-                blocked.remove(envelope.getMessageId());
-            }
-
-            // Null delivery means that we never had credit to send so no delivery was created to carry the message.
-            if (delivery != null) {
-                DeliveryState remoteState = delivery.getRemoteState();
+                if (successful) {
+                    tagGenerator.returnTag(delivery.getTag());
+                }
+                final DeliveryState remoteState = delivery.getRemoteState();
                 tracer.completeSend(envelope.getMessage().getFacade(), remoteState == null ? null : remoteState.getType().name());
             } else {
+                blocked.remove(envelope.getMessageId());
                 tracer.completeSend(envelope.getMessage().getFacade(), null);
             }
 
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
index 5f74521..888b2d8 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -3051,4 +3051,42 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
             finalPeer.waitForAllHandlersToComplete(1000);
         }
     }
+
+    @Test(timeout = 20000)
+    public void testSendTimeoutDoesNotRecycleDeliveryTag() throws Exception {
+        try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+            connection.setSendTimeout(500);
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            String queueName = "myQueue";
+            Queue queue = session.createQueue(queueName);
+
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+
+            // Expect the producer to attach and grant it some credit, it should send
+            // a transfer which we will not send any response for which should cause the
+            // send operation to time out.
+            testPeer.expectSenderAttach();
+            testPeer.expectTransferButDoNotRespond(messageMatcher, equalTo(new Binary(new byte[] { 0 })));
+            testPeer.expectTransfer(messageMatcher, equalTo(new Binary(new byte[] { 1 })));
+            testPeer.expectClose();
+
+            MessageProducer producer = session.createProducer(queue);
+
+            try {
+                producer.send(session.createTextMessage("text"));
+                fail("Send should fail after send timeout exceeded.");
+            } catch (JmsSendTimedOutException error) {
+            }
+
+            producer.send(session.createTextMessage("text"));
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
 }
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 67b2582..6d54148 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -2167,11 +2167,21 @@ public class TestAmqpPeer implements AutoCloseable
         expectTransfer(expectedPayloadMatcher, nullValue(), false, false, null, false);
     }
 
+    public void expectTransferButDoNotRespond(Matcher<Binary> expectedPayloadMatcher, Matcher<Binary> deliveryTagMatcher)
+    {
+        expectTransfer(expectedPayloadMatcher, deliveryTagMatcher, nullValue(), false, false, null, false, 0, 0);
+    }
+
     public void expectTransfer(Matcher<Binary> expectedPayloadMatcher)
     {
         expectTransfer(expectedPayloadMatcher, nullValue(), false, true, new Accepted(), true);
     }
 
+    public void expectTransfer(Matcher<Binary> expectedPayloadMatcher, Matcher<Binary> deliveryTagMatcher)
+    {
+        expectTransfer(expectedPayloadMatcher, deliveryTagMatcher, nullValue(), false, true, new Accepted(), true, 0, 0);
+    }
+
     public void expectTransfer(int frameSize)
     {
         expectTransfer(null, nullValue(), false, true, new Accepted(), true, frameSize, 0);
@@ -2189,9 +2199,17 @@ public class TestAmqpPeer implements AutoCloseable
         expectTransfer(expectedPayloadMatcher, stateMatcher, settled, sendResponseDisposition, responseState, responseSettled, 0, 0);
     }
 
+    public void expectTransfer(Matcher<Binary> expectedPayloadMatcher,
+            Matcher<?> stateMatcher, boolean settled, boolean sendResponseDisposition, ListDescribedType responseState,
+            boolean responseSettled, int frameSize, long dispositionDelay)
+    {
+        expectTransfer(expectedPayloadMatcher, null, stateMatcher, settled, sendResponseDisposition, responseState, responseSettled, frameSize, dispositionDelay);
+    }
+
     //TODO: fix responseState to only admit applicable types.
-    public void expectTransfer(Matcher<Binary> expectedPayloadMatcher, Matcher<?> stateMatcher, boolean settled,
-            boolean sendResponseDisposition, ListDescribedType responseState, boolean responseSettled, int frameSize, long dispositionDelay)
+    public void expectTransfer(Matcher<Binary> expectedPayloadMatcher, Matcher<Binary> deliveryTagMatcher,
+            Matcher<?> stateMatcher, boolean settled, boolean sendResponseDisposition, ListDescribedType responseState,
+            boolean responseSettled, int frameSize, long dispositionDelay)
     {
         Matcher<Boolean> settledMatcher = null;
         if(settled)
@@ -2207,6 +2225,9 @@ public class TestAmqpPeer implements AutoCloseable
         transferMatcher.setPayloadMatcher(expectedPayloadMatcher);
         transferMatcher.withSettled(settledMatcher);
         transferMatcher.withState(stateMatcher);
+        if (deliveryTagMatcher != null) {
+            transferMatcher.withDeliveryTag(deliveryTagMatcher);
+        }
 
         if(sendResponseDisposition) {
             final DispositionFrame dispositionResponse = new DispositionFrame()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org