You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2023/03/02 13:18:16 UTC
[qpid-proton-j] 07/09: PROTON-2687: clean up stale entries in transport work list that cant be actioned, and prevent them from entering where possible
This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch 0.34.x
in repository https://gitbox.apache.org/repos/asf/qpid-proton-j.git
commit a4420f60abc06cc39187e8680bf4b4c6996d7fb7
Author: Robbie Gemmell <ro...@apache.org>
AuthorDate: Thu Feb 23 20:02:41 2023 +0000
PROTON-2687: clean up stale entries in transport work list that cant be actioned, and prevent them from entering where possible
(cherry picked from commit 077bc2d2d374a342ec5b82589486d10067a0b528)
---
.../qpid/proton/engine/impl/DeliveryImpl.java | 6 +
.../qpid/proton/engine/impl/TransportImpl.java | 16 +-
.../qpid/proton/engine/impl/TransportSession.java | 11 +
.../qpid/proton/engine/impl/DeliveryImplTest.java | 2 +
.../qpid/proton/engine/impl/TransportImplTest.java | 222 +++++++++++++++++++++
5 files changed, 255 insertions(+), 2 deletions(-)
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
index 2928d376..432e8aa8 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
@@ -291,6 +291,12 @@ public class DeliveryImpl implements Delivery
void addToTransportWorkList()
{
+ TransportSession transportSession = getLink().getSession().getTransportSession();
+ if (transportSession != null && transportSession.endSent()) {
+ // Too late to action this work, dont add it to the transport work list.
+ return;
+ }
+
getLink().getConnectionImpl().addTransportWork(this);
}
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
index aa304965..6653bd79 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
@@ -561,12 +561,18 @@ public class TransportImpl extends EndpointImpl
SessionImpl session = snd.getSession();
TransportSession tpSession = session.getTransportSession();
+ if (tpSession.endSent()) {
+ // Too late to action this work, clear it.
+ return true;
+ }
+
+ boolean localChannelSet = tpSession.isLocalChannelSet();
boolean wasDone = delivery.isDone();
if(!delivery.isDone() &&
(delivery.getDataLength() > 0 || delivery != snd.current()) &&
tpSession.hasOutgoingCredit() && tpLink.hasCredit() &&
- tpSession.isLocalChannelSet() &&
+ localChannelSet &&
tpLink.getLocalHandle() != null && !_frameWriter.isFull())
{
DeliveryImpl inProgress = tpLink.getInProgressDelivery();
@@ -676,7 +682,7 @@ public class TransportImpl extends EndpointImpl
}
}
- if(wasDone && delivery.getLocalState() != null)
+ if(wasDone && delivery.getLocalState() != null && localChannelSet)
{
TransportDelivery tpDelivery = delivery.getTransportDelivery();
// Use cached object as holder of data for immediate write to the FrameWriter
@@ -703,6 +709,11 @@ public class TransportImpl extends EndpointImpl
SessionImpl session = rcv.getSession();
TransportSession tpSession = session.getTransportSession();
+ if (tpSession.endSent()) {
+ // Too late to action this work, clear it.
+ return true;
+ }
+
if (tpSession.isLocalChannelSet())
{
boolean settled = delivery.isSettled();
@@ -1055,6 +1066,7 @@ public class TransportImpl extends EndpointImpl
}
writeFrame(channel, end, null, null);
+ transportSession.sentEnd();
}
endpoint.clearModified();
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
index 0bab2912..d657ba95 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
@@ -70,6 +70,7 @@ class TransportSession
private int _unsettledIncomingSize;
private boolean _endReceived;
private boolean _beginSent;
+ private boolean _endSent;
TransportSession(TransportImpl transport, SessionImpl session)
{
@@ -527,4 +528,14 @@ class TransportSession
{
_beginSent = true;
}
+
+ public boolean endSent()
+ {
+ return _endSent;
+ }
+
+ public void sentEnd()
+ {
+ _endSent = true;
+ }
}
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/DeliveryImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/DeliveryImplTest.java
index cd390ae1..18143af8 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/DeliveryImplTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/DeliveryImplTest.java
@@ -735,7 +735,9 @@ public class DeliveryImplTest
private DeliveryImpl createSenderDelivery() {
LinkImpl link = Mockito.mock(SenderImpl.class);
ConnectionImpl connection = Mockito.mock(ConnectionImpl.class);
+ SessionImpl session = Mockito.mock(SessionImpl.class);
+ Mockito.when(link.getSession()).thenReturn(session);
Mockito.when(link.getConnectionImpl()).thenReturn(connection);
return new DeliveryImpl(null, link, null);
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
index 4d65343b..56e518bf 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
@@ -4895,4 +4895,226 @@ public class TransportImplTest
assertNoEvents(collector);
}
+
+ /**
+ * Verify that the transport work list doesnt retain deliveries+link+session on when a session
+ * is closed and freed while there is an active receiver link with deliveries still outstanding.
+ */
+ @Test
+ public void testTransportWorkListDoesntLeakDeliveriesEtcFromSessionFreedWithActiveReceiverWithOutstandingDeliveries()
+ {
+ MockTransportImpl transport = new MockTransportImpl();
+ ConnectionImpl connection = new ConnectionImpl();
+ transport.bind(connection);
+
+ connection.open();
+
+ Session session = connection.session();
+ session.open();
+
+ String linkName = "myClientReceiver";
+ Receiver receiver = session.receiver(linkName);
+ receiver.flow(5);
+ receiver.open();
+
+ pumpMockTransport(transport);
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
+
+ assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
+ assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
+ assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach);
+ assertTrue("Unexpected frame type", transport.writes.get(3) instanceof Flow);
+
+ Delivery delivery = receiver.current();
+ assertNull("Should not yet have a delivery", delivery);
+
+ // Send the necessary responses to open/begin/attach as well as a transfer
+ transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+ Begin begin = new Begin();
+ begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+ begin.setNextOutgoingId(UnsignedInteger.ONE);
+ begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+ begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+ transport.handleFrame(new TransportFrame(0, begin, null));
+
+ Attach attach = new Attach();
+ attach.setHandle(UnsignedInteger.ZERO);
+ attach.setRole(Role.SENDER);
+ attach.setName(linkName);
+ attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+ transport.handleFrame(new TransportFrame(0, attach, null));
+
+ assertEndpointState(receiver, EndpointState.ACTIVE, EndpointState.ACTIVE);
+ assertEndpointState(session, EndpointState.ACTIVE, EndpointState.ACTIVE);
+
+ String deliveryTag = "tag1";
+ String messageContent = "content1";
+ handleTransfer(transport, 1, deliveryTag, messageContent);
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
+
+ delivery = verifyDelivery(receiver, deliveryTag, messageContent);
+ assertNotNull("Should now have a delivery", delivery);
+
+ delivery.disposition(Accepted.getInstance());
+
+ assertEquals("Expected the delivery to be on the transport work list", delivery, connection.getTransportWorkHead());
+
+ pumpMockTransport(transport);
+
+ assertNull("Expected the delivery to cleared from the transport work list", connection.getTransportWorkHead());
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 5, transport.writes.size());
+ assertTrue("Unexpected frame type", transport.writes.get(4) instanceof Disposition);
+
+ session.close();
+
+ pumpMockTransport(transport);
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 6, transport.writes.size());
+ assertTrue("Unexpected frame type", transport.writes.get(5) instanceof End);
+
+ assertNull("Expected the transport work list to be empty", connection.getTransportWorkHead());
+ assertEndpointState(session, EndpointState.CLOSED, EndpointState.ACTIVE);
+
+ // Send the necessary responses to End
+ End end = new End();
+ begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+ begin.setNextOutgoingId(UnsignedInteger.ONE);
+ begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+ begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+ transport.handleFrame(new TransportFrame(0, end, null));
+
+ assertEndpointState(session, EndpointState.CLOSED, EndpointState.CLOSED);
+
+ assertNull("Expected the transport work list to be empty", connection.getTransportWorkHead());
+
+ session.free();
+
+ pumpMockTransport(transport);
+
+ assertNull("Expected the transport work list to be empty", connection.getTransportWorkHead());
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 6, transport.writes.size());
+
+ connection.close();
+ pumpMockTransport(transport);
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 7, transport.writes.size());
+ assertTrue("Unexpected frame type", transport.writes.get(6) instanceof Close);
+ }
+
+ /**
+ * Verify that the transport doesnt retain deliveries+link+session when a session is closed
+ * and freed while there is an active sender link with deliveries still outstanding.
+ */
+ @Test
+ public void testTransportWorkListDoesntLeakDeliveriesEtcFromSessionFreedWithActiveSenderWithOutstandingDeliveries()
+ {
+ MockTransportImpl transport = new MockTransportImpl();
+ ConnectionImpl connection = new ConnectionImpl();
+ transport.bind(connection);
+
+ connection.open();
+
+ Session session = connection.session();
+ session.open();
+
+ String linkName = "myClientSender";
+ Sender sender = session.sender(linkName);
+ sender.open();
+
+ pumpMockTransport(transport);
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
+
+ assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
+ assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
+ assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach);
+
+ assertNull("Should not yet have a delivery", sender.current());
+
+ // Send the necessary responses to open/begin/attach as well as a transfer
+ transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+ Begin begin = new Begin();
+ begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+ begin.setNextOutgoingId(UnsignedInteger.ONE);
+ begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+ begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+ transport.handleFrame(new TransportFrame(0, begin, null));
+
+ // Give the necessary response to attach for sender and grant some credit
+ Attach attach = new Attach();
+ attach.setHandle(UnsignedInteger.ZERO);
+ attach.setRole(Role.RECEIVER);
+ attach.setName(linkName);
+ attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+ transport.handleFrame(new TransportFrame(0, attach, null));
+
+ int credit = 10;
+ Flow flow = new Flow();
+ flow.setHandle(UnsignedInteger.ZERO);
+ flow.setDeliveryCount(UnsignedInteger.ZERO);
+ flow.setNextIncomingId(UnsignedInteger.ONE);
+ flow.setNextOutgoingId(UnsignedInteger.ZERO);
+ flow.setIncomingWindow(UnsignedInteger.valueOf(1024));
+ flow.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+ flow.setLinkCredit(UnsignedInteger.valueOf(credit));
+
+ transport.handleFrame(new TransportFrame(0, flow, null));
+
+ assertEndpointState(sender, EndpointState.ACTIVE, EndpointState.ACTIVE);
+ assertEndpointState(session, EndpointState.ACTIVE, EndpointState.ACTIVE);
+
+ assertEquals("Expected the sender to have credit", credit, sender.getCredit());
+
+ Delivery delivery = sendMessage(sender, "tag1", "content1");
+
+ assertEquals("Expected the delivery to be on the transport work list", delivery, connection.getTransportWorkHead());
+ assertEquals("Expected the sender to have 1 less credit", credit -1 , sender.getCredit());
+
+ pumpMockTransport(transport);
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
+ assertTrue("Unexpected frame type", transport.writes.get(3) instanceof Transfer);
+
+ assertNull("Expected the delivery to cleared from the transport work list", connection.getTransportWorkHead());
+
+ session.close();
+
+ pumpMockTransport(transport);
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 5, transport.writes.size());
+ assertTrue("Unexpected frame type", transport.writes.get(4) instanceof End);
+
+ assertNull("Expected the transport work list to be empty", connection.getTransportWorkHead());
+ assertEndpointState(session, EndpointState.CLOSED, EndpointState.ACTIVE);
+
+ // Send the necessary responses to End
+ End end = new End();
+ begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+ begin.setNextOutgoingId(UnsignedInteger.ONE);
+ begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+ begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+ transport.handleFrame(new TransportFrame(0, end, null));
+
+ assertEndpointState(session, EndpointState.CLOSED, EndpointState.CLOSED);
+
+ assertNull("Expected the transport work list to be empty", connection.getTransportWorkHead());
+
+ session.free();
+
+ pumpMockTransport(transport);
+
+ assertNull("Expected the transport work list to be empty", connection.getTransportWorkHead());
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 5, transport.writes.size());
+
+ connection.close();
+ pumpMockTransport(transport);
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 6, transport.writes.size());
+ assertTrue("Unexpected frame type", transport.writes.get(5) instanceof Close);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org