You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2023/03/02 13:18:16 UTC

[qpid-proton-j] 07/09: PROTON-2687: clean up stale entries in transport work list that cant be actioned, and prevent them from entering where possible

This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch 0.34.x
in repository https://gitbox.apache.org/repos/asf/qpid-proton-j.git

commit a4420f60abc06cc39187e8680bf4b4c6996d7fb7
Author: Robbie Gemmell <ro...@apache.org>
AuthorDate: Thu Feb 23 20:02:41 2023 +0000

    PROTON-2687: clean up stale entries in transport work list that cant be actioned, and prevent them from entering where possible
    
    (cherry picked from commit 077bc2d2d374a342ec5b82589486d10067a0b528)
---
 .../qpid/proton/engine/impl/DeliveryImpl.java      |   6 +
 .../qpid/proton/engine/impl/TransportImpl.java     |  16 +-
 .../qpid/proton/engine/impl/TransportSession.java  |  11 +
 .../qpid/proton/engine/impl/DeliveryImplTest.java  |   2 +
 .../qpid/proton/engine/impl/TransportImplTest.java | 222 +++++++++++++++++++++
 5 files changed, 255 insertions(+), 2 deletions(-)

diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
index 2928d376..432e8aa8 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
@@ -291,6 +291,12 @@ public class DeliveryImpl implements Delivery
 
     void addToTransportWorkList()
     {
+        TransportSession transportSession = getLink().getSession().getTransportSession();
+        if (transportSession != null && transportSession.endSent()) {
+            // Too late to action this work, dont add it to the transport work list.
+            return;
+        }
+
         getLink().getConnectionImpl().addTransportWork(this);
     }
 
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
index aa304965..6653bd79 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
@@ -561,12 +561,18 @@ public class TransportImpl extends EndpointImpl
         SessionImpl session = snd.getSession();
         TransportSession tpSession = session.getTransportSession();
 
+        if (tpSession.endSent()) {
+            // Too late to action this work, clear it.
+            return true;
+        }
+
+        boolean localChannelSet = tpSession.isLocalChannelSet();
         boolean wasDone = delivery.isDone();
 
         if(!delivery.isDone() &&
            (delivery.getDataLength() > 0 || delivery != snd.current()) &&
            tpSession.hasOutgoingCredit() && tpLink.hasCredit() &&
-           tpSession.isLocalChannelSet() &&
+           localChannelSet &&
            tpLink.getLocalHandle() != null && !_frameWriter.isFull())
         {
             DeliveryImpl inProgress = tpLink.getInProgressDelivery();
@@ -676,7 +682,7 @@ public class TransportImpl extends EndpointImpl
             }
         }
 
-        if(wasDone && delivery.getLocalState() != null)
+        if(wasDone && delivery.getLocalState() != null && localChannelSet)
         {
             TransportDelivery tpDelivery = delivery.getTransportDelivery();
             // Use cached object as holder of data for immediate write to the FrameWriter
@@ -703,6 +709,11 @@ public class TransportImpl extends EndpointImpl
         SessionImpl session = rcv.getSession();
         TransportSession tpSession = session.getTransportSession();
 
+        if (tpSession.endSent()) {
+            // Too late to action this work, clear it.
+            return true;
+        }
+
         if (tpSession.isLocalChannelSet())
         {
             boolean settled = delivery.isSettled();
@@ -1055,6 +1066,7 @@ public class TransportImpl extends EndpointImpl
                         }
 
                         writeFrame(channel, end, null, null);
+                        transportSession.sentEnd();
                     }
 
                     endpoint.clearModified();
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
index 0bab2912..d657ba95 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
@@ -70,6 +70,7 @@ class TransportSession
     private int _unsettledIncomingSize;
     private boolean _endReceived;
     private boolean _beginSent;
+    private boolean _endSent;
 
     TransportSession(TransportImpl transport, SessionImpl session)
     {
@@ -527,4 +528,14 @@ class TransportSession
     {
         _beginSent = true;
     }
+
+    public boolean endSent()
+    {
+        return _endSent;
+    }
+
+    public void sentEnd()
+    {
+        _endSent = true;
+    }
 }
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/DeliveryImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/DeliveryImplTest.java
index cd390ae1..18143af8 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/DeliveryImplTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/DeliveryImplTest.java
@@ -735,7 +735,9 @@ public class DeliveryImplTest
     private DeliveryImpl createSenderDelivery() {
         LinkImpl link = Mockito.mock(SenderImpl.class);
         ConnectionImpl connection = Mockito.mock(ConnectionImpl.class);
+        SessionImpl session = Mockito.mock(SessionImpl.class);
 
+        Mockito.when(link.getSession()).thenReturn(session);
         Mockito.when(link.getConnectionImpl()).thenReturn(connection);
 
         return new DeliveryImpl(null, link, null);
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
index 4d65343b..56e518bf 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
@@ -4895,4 +4895,226 @@ public class TransportImplTest
 
         assertNoEvents(collector);
     }
+
+    /**
+     * Verify that the transport work list doesnt retain deliveries+link+session on when a session
+     * is closed and freed while there is an active receiver link with deliveries still outstanding.
+     */
+    @Test
+    public void testTransportWorkListDoesntLeakDeliveriesEtcFromSessionFreedWithActiveReceiverWithOutstandingDeliveries()
+    {
+        MockTransportImpl transport = new MockTransportImpl();
+        ConnectionImpl connection = new ConnectionImpl();
+        transport.bind(connection);
+
+        connection.open();
+
+        Session session = connection.session();
+        session.open();
+
+        String linkName = "myClientReceiver";
+        Receiver receiver = session.receiver(linkName);
+        receiver.flow(5);
+        receiver.open();
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
+
+        assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
+        assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
+        assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach);
+        assertTrue("Unexpected frame type", transport.writes.get(3) instanceof Flow);
+
+        Delivery delivery = receiver.current();
+        assertNull("Should not yet have a delivery", delivery);
+
+        // Send the necessary responses to open/begin/attach as well as a transfer
+        transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+        Begin begin = new Begin();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+        begin.setNextOutgoingId(UnsignedInteger.ONE);
+        begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        transport.handleFrame(new TransportFrame(0, begin, null));
+
+        Attach attach = new Attach();
+        attach.setHandle(UnsignedInteger.ZERO);
+        attach.setRole(Role.SENDER);
+        attach.setName(linkName);
+        attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, attach, null));
+
+        assertEndpointState(receiver, EndpointState.ACTIVE, EndpointState.ACTIVE);
+        assertEndpointState(session, EndpointState.ACTIVE, EndpointState.ACTIVE);
+
+        String deliveryTag = "tag1";
+        String messageContent = "content1";
+        handleTransfer(transport, 1, deliveryTag, messageContent);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
+
+        delivery = verifyDelivery(receiver, deliveryTag, messageContent);
+        assertNotNull("Should now have a delivery", delivery);
+
+        delivery.disposition(Accepted.getInstance());
+
+        assertEquals("Expected the delivery to be on the transport work list", delivery, connection.getTransportWorkHead());
+
+        pumpMockTransport(transport);
+
+        assertNull("Expected the delivery to cleared from the transport work list", connection.getTransportWorkHead());
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 5, transport.writes.size());
+        assertTrue("Unexpected frame type", transport.writes.get(4) instanceof Disposition);
+
+        session.close();
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 6, transport.writes.size());
+        assertTrue("Unexpected frame type", transport.writes.get(5) instanceof End);
+
+        assertNull("Expected the transport work list to be empty", connection.getTransportWorkHead());
+        assertEndpointState(session, EndpointState.CLOSED, EndpointState.ACTIVE);
+
+        // Send the necessary responses to End
+        End end = new End();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+        begin.setNextOutgoingId(UnsignedInteger.ONE);
+        begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        transport.handleFrame(new TransportFrame(0, end, null));
+
+        assertEndpointState(session, EndpointState.CLOSED, EndpointState.CLOSED);
+
+        assertNull("Expected the transport work list to be empty", connection.getTransportWorkHead());
+
+        session.free();
+
+        pumpMockTransport(transport);
+
+        assertNull("Expected the transport work list to be empty", connection.getTransportWorkHead());
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 6, transport.writes.size());
+
+        connection.close();
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 7, transport.writes.size());
+        assertTrue("Unexpected frame type", transport.writes.get(6) instanceof Close);
+    }
+
+    /**
+     * Verify that the transport doesnt retain deliveries+link+session when a session is closed
+     * and freed while there is an active sender link with deliveries still outstanding.
+     */
+    @Test
+    public void testTransportWorkListDoesntLeakDeliveriesEtcFromSessionFreedWithActiveSenderWithOutstandingDeliveries()
+    {
+        MockTransportImpl transport = new MockTransportImpl();
+        ConnectionImpl connection = new ConnectionImpl();
+        transport.bind(connection);
+
+        connection.open();
+
+        Session session = connection.session();
+        session.open();
+
+        String linkName = "myClientSender";
+        Sender sender = session.sender(linkName);
+        sender.open();
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
+
+        assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
+        assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
+        assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach);
+
+        assertNull("Should not yet have a delivery", sender.current());
+
+        // Send the necessary responses to open/begin/attach as well as a transfer
+        transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+        Begin begin = new Begin();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+        begin.setNextOutgoingId(UnsignedInteger.ONE);
+        begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        transport.handleFrame(new TransportFrame(0, begin, null));
+
+        // Give the necessary response to attach for sender and grant some credit
+        Attach attach = new Attach();
+        attach.setHandle(UnsignedInteger.ZERO);
+        attach.setRole(Role.RECEIVER);
+        attach.setName(linkName);
+        attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, attach, null));
+
+        int credit = 10;
+        Flow flow = new Flow();
+        flow.setHandle(UnsignedInteger.ZERO);
+        flow.setDeliveryCount(UnsignedInteger.ZERO);
+        flow.setNextIncomingId(UnsignedInteger.ONE);
+        flow.setNextOutgoingId(UnsignedInteger.ZERO);
+        flow.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        flow.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        flow.setLinkCredit(UnsignedInteger.valueOf(credit));
+
+        transport.handleFrame(new TransportFrame(0, flow, null));
+
+        assertEndpointState(sender, EndpointState.ACTIVE, EndpointState.ACTIVE);
+        assertEndpointState(session, EndpointState.ACTIVE, EndpointState.ACTIVE);
+
+        assertEquals("Expected the sender to have credit", credit, sender.getCredit());
+
+        Delivery delivery = sendMessage(sender, "tag1", "content1");
+
+        assertEquals("Expected the delivery to be on the transport work list", delivery, connection.getTransportWorkHead());
+        assertEquals("Expected the sender to have 1 less credit", credit -1 , sender.getCredit());
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
+        assertTrue("Unexpected frame type", transport.writes.get(3) instanceof Transfer);
+
+        assertNull("Expected the delivery to cleared from the transport work list", connection.getTransportWorkHead());
+
+        session.close();
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 5, transport.writes.size());
+        assertTrue("Unexpected frame type", transport.writes.get(4) instanceof End);
+
+        assertNull("Expected the transport work list to be empty", connection.getTransportWorkHead());
+        assertEndpointState(session, EndpointState.CLOSED, EndpointState.ACTIVE);
+
+        // Send the necessary responses to End
+        End end = new End();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+        begin.setNextOutgoingId(UnsignedInteger.ONE);
+        begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        transport.handleFrame(new TransportFrame(0, end, null));
+
+        assertEndpointState(session, EndpointState.CLOSED, EndpointState.CLOSED);
+
+        assertNull("Expected the transport work list to be empty", connection.getTransportWorkHead());
+
+        session.free();
+
+        pumpMockTransport(transport);
+
+        assertNull("Expected the transport work list to be empty", connection.getTransportWorkHead());
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 5, transport.writes.size());
+
+        connection.close();
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 6, transport.writes.size());
+        assertTrue("Unexpected frame type", transport.writes.get(5) instanceof Close);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org