You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2023/03/01 16:29:03 UTC
[qpid-proton-j] 02/02: PROTON-2688: clean up entries in transport work list for sender delivery that can no longer be sent
This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton-j.git
commit 8c1f2326d46b9a67ae14bc3431acc6cddfbb7524
Author: Robbie Gemmell <ro...@apache.org>
AuthorDate: Wed Mar 1 16:24:12 2023 +0000
PROTON-2688: clean up entries in transport work list for sender delivery that can no longer be sent
---
.../qpid/proton/engine/impl/TransportImpl.java | 6 ++
.../qpid/proton/engine/impl/TransportLink.java | 11 +++
.../qpid/proton/engine/impl/TransportImplTest.java | 86 ++++++++++++++++++++++
3 files changed, 103 insertions(+)
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
index da2e9abb..b9f07dc0 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
@@ -469,6 +469,7 @@ public class TransportImpl extends EndpointImpl
}
writeFrame(transportSession.getLocalChannel(), detach, null, null);
+ transportLink.sentDetach();
}
endpoint.clearModified();
@@ -699,6 +700,11 @@ public class TransportImpl extends EndpointImpl
writeFrame(tpSession.getLocalChannel(), cachedDisposition, null, null);
}
+ if(!wasDone && tpLink != null && tpLink.detachSent()) {
+ // Too late to action this work, clear it.
+ return true;
+ }
+
return !delivery.isBuffered();
}
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java
index 836cf71c..0bd20cf1 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java
@@ -37,6 +37,7 @@ class TransportLink<T extends LinkImpl>
private UnsignedInteger _remoteLinkCredit;
private boolean _detachReceived;
private boolean _attachSent;
+ private boolean _detachSent;
protected TransportLink(T link)
{
@@ -226,4 +227,14 @@ class TransportLink<T extends LinkImpl>
{
_remoteDeliveryCount = remoteDeliveryCount;
}
+
+ public boolean detachSent()
+ {
+ return _detachSent;
+ }
+
+ public void sentDetach()
+ {
+ _detachSent = true;
+ }
}
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
index 82ff4c81..87d845de 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
@@ -5109,4 +5109,90 @@ public class TransportImplTest
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 6, transport.writes.size());
assertTrue("Unexpected frame type", transport.writes.get(5) instanceof Close);
}
+
+ /**
+ * Verify that the transport work list doesnt retain delivery (+link+session) object when a sender link is closed
+ * closed and freed while there a buffered/not-transferred message outstanding, as they can no longer be transferred.
+ */
+ @Test
+ public void testTransportWorkListDoesntLeakDeliveriesEtcFromSenderLinkFreedWithBufferedSend()
+ {
+ MockTransportImpl transport = new MockTransportImpl();
+ ConnectionImpl connection = new ConnectionImpl();
+ transport.bind(connection);
+
+ connection.open();
+
+ Session session = connection.session();
+ session.open();
+
+ String linkName = "myClientSender";
+ Sender sender = session.sender(linkName);
+ sender.open();
+
+ pumpMockTransport(transport);
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
+
+ assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
+ assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
+ assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach);
+
+ assertNull("Should not yet have a delivery", sender.current());
+
+ // Send the necessary responses to open/begin/attach. DO NOT give any credit.
+ transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+ Begin begin = new Begin();
+ begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+ begin.setNextOutgoingId(UnsignedInteger.ONE);
+ begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+ begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+ transport.handleFrame(new TransportFrame(0, begin, null));
+
+ Attach attach = new Attach();
+ attach.setHandle(UnsignedInteger.ZERO);
+ attach.setRole(Role.RECEIVER);
+ attach.setName(linkName);
+ attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+ transport.handleFrame(new TransportFrame(0, attach, null));
+
+ assertEndpointState(sender, EndpointState.ACTIVE, EndpointState.ACTIVE);
+ assertEndpointState(session, EndpointState.ACTIVE, EndpointState.ACTIVE);
+
+ assertEquals("Expected the sender to have no credit", 0, sender.getCredit());
+
+ Delivery delivery = sendMessage(sender, "tag1", "content1");
+
+ assertEquals("Expected the delivery to be on the transport work list", delivery, connection.getTransportWorkHead());
+ assertEquals("Expected the sender to have queued message", 1 , sender.getQueued());
+
+ pumpMockTransport(transport);
+
+ // Expect no more frames to have been sent, delivery cant be sent without credit
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
+
+ assertEquals("Expected the delivery to be on the transport work list", delivery, connection.getTransportWorkHead());
+
+ // Send a remote request to close the sender and action it
+ Detach detach = new Detach();
+ detach.setHandle(UnsignedInteger.ZERO);
+ detach.setClosed(true);
+ transport.handleFrame(new TransportFrame(0, detach, null));
+
+ assertEndpointState(sender, EndpointState.ACTIVE, EndpointState.CLOSED);
+
+ sender.close();
+ sender.free();
+
+ pumpMockTransport(transport);
+
+ assertEndpointState(sender, EndpointState.CLOSED, EndpointState.CLOSED);
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
+ assertTrue("Unexpected frame type", transport.writes.get(3) instanceof Detach);
+
+ // Check the delivery isnt in the work list as it clearly cant be sent now the sender is closed.
+ assertNull("Expected the transport work list to be empty", connection.getTransportWorkHead());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org