You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2023/03/01 16:29:03 UTC

[qpid-proton-j] 02/02: PROTON-2688: clean up entries in transport work list for sender delivery that can no longer be sent

This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton-j.git

commit 8c1f2326d46b9a67ae14bc3431acc6cddfbb7524
Author: Robbie Gemmell <ro...@apache.org>
AuthorDate: Wed Mar 1 16:24:12 2023 +0000

    PROTON-2688: clean up entries in transport work list for sender delivery that can no longer be sent
---
 .../qpid/proton/engine/impl/TransportImpl.java     |  6 ++
 .../qpid/proton/engine/impl/TransportLink.java     | 11 +++
 .../qpid/proton/engine/impl/TransportImplTest.java | 86 ++++++++++++++++++++++
 3 files changed, 103 insertions(+)

diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
index da2e9abb..b9f07dc0 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
@@ -469,6 +469,7 @@ public class TransportImpl extends EndpointImpl
                         }
 
                         writeFrame(transportSession.getLocalChannel(), detach, null, null);
+                        transportLink.sentDetach();
                     }
 
                     endpoint.clearModified();
@@ -699,6 +700,11 @@ public class TransportImpl extends EndpointImpl
             writeFrame(tpSession.getLocalChannel(), cachedDisposition, null, null);
         }
 
+        if(!wasDone && tpLink != null && tpLink.detachSent()) {
+            // Too late to action this work, clear it.
+            return true;
+        }
+
         return !delivery.isBuffered();
     }
 
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java
index 836cf71c..0bd20cf1 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java
@@ -37,6 +37,7 @@ class TransportLink<T extends LinkImpl>
     private UnsignedInteger _remoteLinkCredit;
     private boolean _detachReceived;
     private boolean _attachSent;
+    private boolean _detachSent;
 
     protected TransportLink(T link)
     {
@@ -226,4 +227,14 @@ class TransportLink<T extends LinkImpl>
     {
         _remoteDeliveryCount = remoteDeliveryCount;
     }
+
+    public boolean detachSent()
+    {
+        return _detachSent;
+    }
+
+    public void sentDetach()
+    {
+        _detachSent = true;
+    }
 }
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
index 82ff4c81..87d845de 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
@@ -5109,4 +5109,90 @@ public class TransportImplTest
         assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 6, transport.writes.size());
         assertTrue("Unexpected frame type", transport.writes.get(5) instanceof Close);
     }
+
+    /**
+     * Verify that the transport work list doesnt retain delivery (+link+session) object when a sender link is closed
+     * closed and freed while there a buffered/not-transferred message outstanding, as they can no longer be transferred.
+     */
+    @Test
+    public void testTransportWorkListDoesntLeakDeliveriesEtcFromSenderLinkFreedWithBufferedSend()
+    {
+        MockTransportImpl transport = new MockTransportImpl();
+        ConnectionImpl connection = new ConnectionImpl();
+        transport.bind(connection);
+
+        connection.open();
+
+        Session session = connection.session();
+        session.open();
+
+        String linkName = "myClientSender";
+        Sender sender = session.sender(linkName);
+        sender.open();
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
+
+        assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
+        assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
+        assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach);
+
+        assertNull("Should not yet have a delivery", sender.current());
+
+        // Send the necessary responses to open/begin/attach. DO NOT give any credit.
+        transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+        Begin begin = new Begin();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+        begin.setNextOutgoingId(UnsignedInteger.ONE);
+        begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        transport.handleFrame(new TransportFrame(0, begin, null));
+
+        Attach attach = new Attach();
+        attach.setHandle(UnsignedInteger.ZERO);
+        attach.setRole(Role.RECEIVER);
+        attach.setName(linkName);
+        attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, attach, null));
+
+        assertEndpointState(sender, EndpointState.ACTIVE, EndpointState.ACTIVE);
+        assertEndpointState(session, EndpointState.ACTIVE, EndpointState.ACTIVE);
+
+        assertEquals("Expected the sender to have no credit", 0, sender.getCredit());
+
+        Delivery delivery = sendMessage(sender, "tag1", "content1");
+
+        assertEquals("Expected the delivery to be on the transport work list", delivery, connection.getTransportWorkHead());
+        assertEquals("Expected the sender to have queued message", 1 , sender.getQueued());
+
+        pumpMockTransport(transport);
+
+        // Expect no more frames to have been sent, delivery cant be sent without credit
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
+
+        assertEquals("Expected the delivery to be on the transport work list", delivery, connection.getTransportWorkHead());
+
+        // Send a remote request to close the sender and action it
+        Detach detach = new Detach();
+        detach.setHandle(UnsignedInteger.ZERO);
+        detach.setClosed(true);
+        transport.handleFrame(new TransportFrame(0, detach, null));
+
+        assertEndpointState(sender, EndpointState.ACTIVE, EndpointState.CLOSED);
+
+        sender.close();
+        sender.free();
+
+        pumpMockTransport(transport);
+
+        assertEndpointState(sender, EndpointState.CLOSED, EndpointState.CLOSED);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
+        assertTrue("Unexpected frame type", transport.writes.get(3) instanceof Detach);
+
+        // Check the delivery isnt in the work list as it clearly cant be sent now the sender is closed.
+        assertNull("Expected the transport work list to be empty", connection.getTransportWorkHead());
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org