You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2023/03/01 16:29:01 UTC

[qpid-proton-j] branch main updated (077bc2d2 -> 8c1f2326)

This is an automated email from the ASF dual-hosted git repository.

robbie pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton-j.git


    from 077bc2d2 PROTON-2687: clean up stale entries in transport work list that cant be actioned, and prevent them from entering where possible
     new 9a8979e8 PROTON-2687: restore old gate to original structure, add additional protection to new gates, remove some extraneous leftovers from tests
     new 8c1f2326 PROTON-2688: clean up entries in transport work list for sender delivery that can no longer be sent

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../qpid/proton/engine/impl/TransportImpl.java     | 13 ++-
 .../qpid/proton/engine/impl/TransportLink.java     | 11 +++
 .../qpid/proton/engine/impl/TransportImplTest.java | 96 ++++++++++++++++++++--
 3 files changed, 107 insertions(+), 13 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-proton-j] 01/02: PROTON-2687: restore old gate to original structure, add additional protection to new gates, remove some extraneous leftovers from tests

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton-j.git

commit 9a8979e893c9186fb55c24eb32584e3f6f2d4d0b
Author: Robbie Gemmell <ro...@apache.org>
AuthorDate: Wed Mar 1 13:26:17 2023 +0000

    PROTON-2687: restore old gate to original structure, add additional protection to new gates, remove some extraneous leftovers from tests
---
 .../java/org/apache/qpid/proton/engine/impl/TransportImpl.java |  7 +++----
 .../org/apache/qpid/proton/engine/impl/TransportImplTest.java  | 10 +---------
 2 files changed, 4 insertions(+), 13 deletions(-)

diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
index 6653bd79..da2e9abb 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
@@ -561,18 +561,17 @@ public class TransportImpl extends EndpointImpl
         SessionImpl session = snd.getSession();
         TransportSession tpSession = session.getTransportSession();
 
-        if (tpSession.endSent()) {
+        if (tpSession != null && tpSession.endSent()) {
             // Too late to action this work, clear it.
             return true;
         }
 
-        boolean localChannelSet = tpSession.isLocalChannelSet();
         boolean wasDone = delivery.isDone();
 
         if(!delivery.isDone() &&
            (delivery.getDataLength() > 0 || delivery != snd.current()) &&
            tpSession.hasOutgoingCredit() && tpLink.hasCredit() &&
-           localChannelSet &&
+           tpSession.isLocalChannelSet() &&
            tpLink.getLocalHandle() != null && !_frameWriter.isFull())
         {
             DeliveryImpl inProgress = tpLink.getInProgressDelivery();
@@ -682,7 +681,7 @@ public class TransportImpl extends EndpointImpl
             }
         }
 
-        if(wasDone && delivery.getLocalState() != null && localChannelSet)
+        if(wasDone && delivery.getLocalState() != null && tpSession.isLocalChannelSet())
         {
             TransportDelivery tpDelivery = delivery.getTransportDelivery();
             // Use cached object as holder of data for immediate write to the FrameWriter
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
index 56e518bf..82ff4c81 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
@@ -4981,10 +4981,6 @@ public class TransportImplTest
 
         // Send the necessary responses to End
         End end = new End();
-        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
-        begin.setNextOutgoingId(UnsignedInteger.ONE);
-        begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
-        begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
         transport.handleFrame(new TransportFrame(0, end, null));
 
         assertEndpointState(session, EndpointState.CLOSED, EndpointState.CLOSED);
@@ -5035,7 +5031,7 @@ public class TransportImplTest
 
         assertNull("Should not yet have a delivery", sender.current());
 
-        // Send the necessary responses to open/begin/attach as well as a transfer
+        // Send the necessary responses to open/begin
         transport.handleFrame(new TransportFrame(0, new Open(), null));
 
         Begin begin = new Begin();
@@ -5094,10 +5090,6 @@ public class TransportImplTest
 
         // Send the necessary responses to End
         End end = new End();
-        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
-        begin.setNextOutgoingId(UnsignedInteger.ONE);
-        begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
-        begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
         transport.handleFrame(new TransportFrame(0, end, null));
 
         assertEndpointState(session, EndpointState.CLOSED, EndpointState.CLOSED);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-proton-j] 02/02: PROTON-2688: clean up entries in transport work list for sender delivery that can no longer be sent

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton-j.git

commit 8c1f2326d46b9a67ae14bc3431acc6cddfbb7524
Author: Robbie Gemmell <ro...@apache.org>
AuthorDate: Wed Mar 1 16:24:12 2023 +0000

    PROTON-2688: clean up entries in transport work list for sender delivery that can no longer be sent
---
 .../qpid/proton/engine/impl/TransportImpl.java     |  6 ++
 .../qpid/proton/engine/impl/TransportLink.java     | 11 +++
 .../qpid/proton/engine/impl/TransportImplTest.java | 86 ++++++++++++++++++++++
 3 files changed, 103 insertions(+)

diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
index da2e9abb..b9f07dc0 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
@@ -469,6 +469,7 @@ public class TransportImpl extends EndpointImpl
                         }
 
                         writeFrame(transportSession.getLocalChannel(), detach, null, null);
+                        transportLink.sentDetach();
                     }
 
                     endpoint.clearModified();
@@ -699,6 +700,11 @@ public class TransportImpl extends EndpointImpl
             writeFrame(tpSession.getLocalChannel(), cachedDisposition, null, null);
         }
 
+        if(!wasDone && tpLink != null && tpLink.detachSent()) {
+            // Too late to action this work, clear it.
+            return true;
+        }
+
         return !delivery.isBuffered();
     }
 
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java
index 836cf71c..0bd20cf1 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java
@@ -37,6 +37,7 @@ class TransportLink<T extends LinkImpl>
     private UnsignedInteger _remoteLinkCredit;
     private boolean _detachReceived;
     private boolean _attachSent;
+    private boolean _detachSent;
 
     protected TransportLink(T link)
     {
@@ -226,4 +227,14 @@ class TransportLink<T extends LinkImpl>
     {
         _remoteDeliveryCount = remoteDeliveryCount;
     }
+
+    public boolean detachSent()
+    {
+        return _detachSent;
+    }
+
+    public void sentDetach()
+    {
+        _detachSent = true;
+    }
 }
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
index 82ff4c81..87d845de 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
@@ -5109,4 +5109,90 @@ public class TransportImplTest
         assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 6, transport.writes.size());
         assertTrue("Unexpected frame type", transport.writes.get(5) instanceof Close);
     }
+
+    /**
+     * Verify that the transport work list doesnt retain delivery (+link+session) object when a sender link is closed
+     * closed and freed while there a buffered/not-transferred message outstanding, as they can no longer be transferred.
+     */
+    @Test
+    public void testTransportWorkListDoesntLeakDeliveriesEtcFromSenderLinkFreedWithBufferedSend()
+    {
+        MockTransportImpl transport = new MockTransportImpl();
+        ConnectionImpl connection = new ConnectionImpl();
+        transport.bind(connection);
+
+        connection.open();
+
+        Session session = connection.session();
+        session.open();
+
+        String linkName = "myClientSender";
+        Sender sender = session.sender(linkName);
+        sender.open();
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
+
+        assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
+        assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
+        assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach);
+
+        assertNull("Should not yet have a delivery", sender.current());
+
+        // Send the necessary responses to open/begin/attach. DO NOT give any credit.
+        transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+        Begin begin = new Begin();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+        begin.setNextOutgoingId(UnsignedInteger.ONE);
+        begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        transport.handleFrame(new TransportFrame(0, begin, null));
+
+        Attach attach = new Attach();
+        attach.setHandle(UnsignedInteger.ZERO);
+        attach.setRole(Role.RECEIVER);
+        attach.setName(linkName);
+        attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, attach, null));
+
+        assertEndpointState(sender, EndpointState.ACTIVE, EndpointState.ACTIVE);
+        assertEndpointState(session, EndpointState.ACTIVE, EndpointState.ACTIVE);
+
+        assertEquals("Expected the sender to have no credit", 0, sender.getCredit());
+
+        Delivery delivery = sendMessage(sender, "tag1", "content1");
+
+        assertEquals("Expected the delivery to be on the transport work list", delivery, connection.getTransportWorkHead());
+        assertEquals("Expected the sender to have queued message", 1 , sender.getQueued());
+
+        pumpMockTransport(transport);
+
+        // Expect no more frames to have been sent, delivery cant be sent without credit
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
+
+        assertEquals("Expected the delivery to be on the transport work list", delivery, connection.getTransportWorkHead());
+
+        // Send a remote request to close the sender and action it
+        Detach detach = new Detach();
+        detach.setHandle(UnsignedInteger.ZERO);
+        detach.setClosed(true);
+        transport.handleFrame(new TransportFrame(0, detach, null));
+
+        assertEndpointState(sender, EndpointState.ACTIVE, EndpointState.CLOSED);
+
+        sender.close();
+        sender.free();
+
+        pumpMockTransport(transport);
+
+        assertEndpointState(sender, EndpointState.CLOSED, EndpointState.CLOSED);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
+        assertTrue("Unexpected frame type", transport.writes.get(3) instanceof Detach);
+
+        // Check the delivery isnt in the work list as it clearly cant be sent now the sender is closed.
+        assertNull("Expected the transport work list to be empty", connection.getTransportWorkHead());
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org