You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2020/10/14 15:28:20 UTC
[qpid-proton-j] branch master updated: PROTON-2284: track half-open
sender and receiver links seperately to resolve upon peer attach
This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton-j.git
The following commit(s) were added to refs/heads/master by this push:
new 754d40e PROTON-2284: track half-open sender and receiver links seperately to resolve upon peer attach
754d40e is described below
commit 754d40e2a2e58aab7a252d933daef6d572f9f508
Author: Robbie Gemmell <ro...@apache.org>
AuthorDate: Wed Oct 14 16:21:47 2020 +0100
PROTON-2284: track half-open sender and receiver links seperately to resolve upon peer attach
---
.../qpid/proton/engine/impl/TransportImpl.java | 8 +-
.../qpid/proton/engine/impl/TransportSession.java | 25 +-
.../qpid/proton/engine/impl/TransportImplTest.java | 304 ++++++++++++++++++++-
3 files changed, 328 insertions(+), 9 deletions(-)
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
index 7307aff..aa30496 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
@@ -800,11 +800,12 @@ public class TransportImpl extends EndpointImpl
&& !transportLink.isLocalHandleSet()) || link.getRemoteState() == EndpointState.UNINITIALIZED)
{
+ Role role = endpoint instanceof ReceiverImpl ? Role.RECEIVER : Role.SENDER;
UnsignedInteger localHandle = transportSession.allocateLocalHandle(transportLink);
if(link.getRemoteState() == EndpointState.UNINITIALIZED)
{
- transportSession.addHalfOpenLink(transportLink);
+ transportSession.addHalfOpenLink(transportLink, Role.SENDER == role);
}
Attach attach = new Attach();
@@ -851,7 +852,7 @@ public class TransportImpl extends EndpointImpl
attach.setMaxMessageSize(link.getMaxMessageSize());
}
- attach.setRole(endpoint instanceof ReceiverImpl ? Role.RECEIVER : Role.SENDER);
+ attach.setRole(role);
if(link instanceof SenderImpl)
{
@@ -1268,7 +1269,8 @@ public class TransportImpl extends EndpointImpl
}
else
{
- transportLink = transportSession.resolveHalfOpenLink(attach.getName());
+ // We flip the peer role to determine our local role, and try to resolve an existing half-open link
+ transportLink = transportSession.resolveHalfOpenLink(attach.getName(), Role.RECEIVER == attach.getRole());
if(transportLink == null)
{
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
index e473675..0bab291 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
@@ -54,7 +54,8 @@ class TransportSession
private final Map<UnsignedInteger, TransportLink<?>> _remoteHandlesMap = new HashMap<UnsignedInteger, TransportLink<?>>();
private final Map<UnsignedInteger, TransportLink<?>> _localHandlesMap = new HashMap<UnsignedInteger, TransportLink<?>>();
- private final Map<String, TransportLink> _halfOpenLinks = new HashMap<String, TransportLink>();
+ private final Map<String, TransportLink> _halfOpenSenderLinks = new HashMap<String, TransportLink>();
+ private final Map<String, TransportLink> _halfOpenReceiverLinks = new HashMap<String, TransportLink>();
private UnsignedInteger _incomingDeliveryId = null;
@@ -251,14 +252,28 @@ class TransportSession
_remoteHandlesMap.remove(handle);
}
- public TransportLink resolveHalfOpenLink(String name)
+ public TransportLink resolveHalfOpenLink(String name, boolean isSender)
{
- return _halfOpenLinks.remove(name);
+ if(isSender)
+ {
+ return _halfOpenSenderLinks.remove(name);
+ }
+ else
+ {
+ return _halfOpenReceiverLinks.remove(name);
+ }
}
- public void addHalfOpenLink(TransportLink link)
+ public void addHalfOpenLink(TransportLink link, boolean isSender)
{
- _halfOpenLinks.put(link.getName(), link);
+ if(isSender)
+ {
+ _halfOpenSenderLinks.put(link.getName(), link);
+ }
+ else
+ {
+ _halfOpenReceiverLinks.put(link.getName(), link);
+ }
}
public void handleTransfer(Transfer transfer, Binary payload)
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
index 5217421..a40da89 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.eq;
@@ -67,6 +68,7 @@ import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.engine.Collector;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Endpoint;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Link;
@@ -2973,6 +2975,11 @@ public class TransportImplTest
assertEquals("Unexpected delivery id", ((Disposition) frame).getLast(), UnsignedInteger.valueOf(2));
}
+ private void handleTransfer(TransportImpl transport, UnsignedInteger handle, int deliveryId, String deliveryTag, byte[] payload)
+ {
+ handlePartialTransfer(transport, handle, UnsignedInteger.valueOf(deliveryId), deliveryTag, payload, false);
+ }
+
private void handlePartialTransfer(TransportImpl transport, UnsignedInteger handle, int deliveryId, String deliveryTag, byte[] partialPayload, boolean more)
{
handlePartialTransfer(transport, handle, UnsignedInteger.valueOf(deliveryId), deliveryTag, partialPayload, more);
@@ -4143,7 +4150,7 @@ public class TransportImplTest
Attach attach = new Attach();
attach.setHandle(UnsignedInteger.ZERO);
- attach.setRole(Role.SENDER);
+ attach.setRole(Role.RECEIVER);
attach.setName(linkName);
attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
transport.handleFrame(new TransportFrame(0, attach, null));
@@ -4590,4 +4597,299 @@ public class TransportImplTest
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 6, transport.writes.size());
assertTrue("Unexpected frame type", transport.writes.get(5) instanceof Close);
}
+
+ @Test
+ public void testParallelOpenOfSenderAndReceiverLinksWithSameName()
+ {
+ MockTransportImpl transport = new MockTransportImpl();
+ transport.setEmitFlowEventOnSend(false);
+ Connection connection = Proton.connection();
+ transport.bind(connection);
+
+ Collector collector = Collector.Factory.create();
+ connection.collect(collector);
+
+ connection.open();
+
+ Session session = connection.session();
+ session.open();
+
+ pumpMockTransport(transport);
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 2, transport.writes.size());
+
+ assertEvents(collector, Event.Type.CONNECTION_INIT, Event.Type.CONNECTION_LOCAL_OPEN, Event.Type.TRANSPORT,
+ Event.Type.SESSION_INIT, Event.Type.SESSION_LOCAL_OPEN, Event.Type.TRANSPORT);
+
+ // Give the necessary responses to open/begin
+ transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+ Begin begin = new Begin();
+ begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+ begin.setNextOutgoingId(UnsignedInteger.ONE);
+ begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+ begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+ transport.handleFrame(new TransportFrame(0, begin, null));
+
+ final UnsignedInteger receiverHandle = UnsignedInteger.ZERO;
+ final UnsignedInteger senderHandle = UnsignedInteger.ONE;
+
+ // Open a receiver and a sender with same link name in parallel, i.e send both
+ // attaches, such that both are locally open but neither is yet remotely open.
+ String linkName = "myLinkName";
+ Receiver receiver = session.receiver(linkName);
+ assertEndpointState(receiver, EndpointState.UNINITIALIZED, EndpointState.UNINITIALIZED);
+
+ Sender sender = session.sender(linkName);
+ assertEndpointState(sender, EndpointState.UNINITIALIZED, EndpointState.UNINITIALIZED);
+
+ receiver.open();
+ sender.open();
+
+ assertEndpointState(receiver, EndpointState.ACTIVE, EndpointState.UNINITIALIZED);
+ assertEndpointState(sender, EndpointState.ACTIVE, EndpointState.UNINITIALIZED);
+
+ assertEvents(collector, Event.Type.CONNECTION_REMOTE_OPEN, Event.Type.SESSION_REMOTE_OPEN,
+ Event.Type.LINK_INIT, Event.Type.LINK_INIT, Event.Type.LINK_LOCAL_OPEN, Event.Type.TRANSPORT,
+ Event.Type.LINK_LOCAL_OPEN, Event.Type.TRANSPORT);
+
+ pumpMockTransport(transport);
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
+
+ assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach);
+ Attach receiverAttach = (Attach) transport.writes.get(2);
+ validateAttach(receiverAttach, true, linkName, receiverHandle);
+
+ assertTrue("Unexpected frame type", transport.writes.get(3) instanceof Attach);
+ Attach senderAttach = (Attach) transport.writes.get(3);
+ validateAttach(senderAttach, false, linkName, senderHandle);
+
+ // Give the necessary response to attach for receiver
+ Attach recieverResponseAttach = new Attach();
+ recieverResponseAttach.setHandle(receiverHandle);
+ recieverResponseAttach.setRole(Role.SENDER); // we receive, so peer response is sender role
+ recieverResponseAttach.setName(linkName);
+ recieverResponseAttach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+ transport.handleFrame(new TransportFrame(0, recieverResponseAttach, null));
+
+ // Verify the local link state and events were updated as expected
+ assertEndpointState(receiver, EndpointState.ACTIVE, EndpointState.ACTIVE);
+
+ Event recieverRemoteOpenEvent = collector.peek();
+ assertSame("Unexpected event context", receiver, recieverRemoteOpenEvent.getContext());
+ assertEquals(Event.Type.LINK_REMOTE_OPEN, recieverRemoteOpenEvent.getEventType());
+ assertEvents(collector, Event.Type.LINK_REMOTE_OPEN);
+
+ // Give the necessary response to attach for sender
+ Attach senderResponseAttach = new Attach();
+ senderResponseAttach.setHandle(senderHandle);
+ senderResponseAttach.setRole(Role.RECEIVER); // we send, so peer response is receiver role
+ senderResponseAttach.setName(linkName);
+ senderResponseAttach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+ transport.handleFrame(new TransportFrame(0, senderResponseAttach, null));
+
+ // Verify the local link state and events were updated as expected
+
+ assertEndpointState(sender, EndpointState.ACTIVE, EndpointState.ACTIVE);
+
+ Event senderRemoteOpenEvent = collector.peek();
+ assertSame("Unexpected event context", sender, senderRemoteOpenEvent.getContext());
+ assertEquals(Event.Type.LINK_REMOTE_OPEN, senderRemoteOpenEvent.getEventType());
+ assertEvents(collector, Event.Type.LINK_REMOTE_OPEN);
+
+ // Use the opened sender and receiver a bit
+ exerciseOpenedSenderAndReceiver(transport, collector, session, sender, senderHandle, receiver, receiverHandle);
+ }
+
+ @Test
+ public void testSequentialOpenOfSenderAndReceiverLinksWithSameName() {
+ MockTransportImpl transport = new MockTransportImpl();
+ transport.setEmitFlowEventOnSend(false);
+ Connection connection = Proton.connection();
+ transport.bind(connection);
+
+ Collector collector = Collector.Factory.create();
+ connection.collect(collector);
+
+ connection.open();
+
+ Session session = connection.session();
+ session.open();
+
+ pumpMockTransport(transport);
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 2, transport.writes.size());
+
+ assertEvents(collector, Event.Type.CONNECTION_INIT, Event.Type.CONNECTION_LOCAL_OPEN, Event.Type.TRANSPORT,
+ Event.Type.SESSION_INIT, Event.Type.SESSION_LOCAL_OPEN, Event.Type.TRANSPORT);
+
+ // Give the necessary responses to open/begin
+ transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+ Begin begin = new Begin();
+ begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+ begin.setNextOutgoingId(UnsignedInteger.ONE);
+ begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+ begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+ transport.handleFrame(new TransportFrame(0, begin, null));
+
+ final UnsignedInteger receiverHandle = UnsignedInteger.ZERO;
+ final UnsignedInteger senderHandle = UnsignedInteger.ONE;
+
+ // Open a receiver
+ String linkName = "myLinkName";
+ Receiver receiver = session.receiver(linkName);
+ assertEndpointState(receiver, EndpointState.UNINITIALIZED, EndpointState.UNINITIALIZED);
+ receiver.open();
+
+ assertEndpointState(receiver, EndpointState.ACTIVE, EndpointState.UNINITIALIZED);
+
+ assertEvents(collector, Event.Type.CONNECTION_REMOTE_OPEN, Event.Type.SESSION_REMOTE_OPEN,
+ Event.Type.LINK_INIT, Event.Type.LINK_LOCAL_OPEN, Event.Type.TRANSPORT);
+
+ pumpMockTransport(transport);
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
+
+ assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach);
+ Attach receiverAttach = (Attach) transport.writes.get(2);
+ validateAttach(receiverAttach, true, linkName, receiverHandle);
+
+ // Give the necessary response to attach for receiver
+ Attach recieverResponseAttach = new Attach();
+ recieverResponseAttach.setHandle(receiverHandle);
+ recieverResponseAttach.setRole(Role.SENDER); // we receive, so peer response is sender role
+ recieverResponseAttach.setName(linkName);
+ recieverResponseAttach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+ transport.handleFrame(new TransportFrame(0, recieverResponseAttach, null));
+
+ // Verify the local link state and events were updated as expected
+ assertEndpointState(receiver, EndpointState.ACTIVE, EndpointState.ACTIVE);
+
+ Event recieverRemoteOpenEvent = collector.peek();
+ assertSame("Unexpected event context", receiver, recieverRemoteOpenEvent.getContext());
+ assertEquals(Event.Type.LINK_REMOTE_OPEN, recieverRemoteOpenEvent.getEventType());
+ assertEvents(collector, Event.Type.LINK_REMOTE_OPEN);
+
+ // Open a sender, with same link name
+ Sender sender = session.sender(linkName);
+ assertEndpointState(sender, EndpointState.UNINITIALIZED, EndpointState.UNINITIALIZED);
+ sender.open();
+
+ assertEndpointState(sender, EndpointState.ACTIVE, EndpointState.UNINITIALIZED);
+
+ assertEvents(collector, Event.Type.LINK_INIT, Event.Type.LINK_LOCAL_OPEN, Event.Type.TRANSPORT);
+
+ pumpMockTransport(transport);
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
+
+ assertTrue("Unexpected frame type", transport.writes.get(3) instanceof Attach);
+ Attach senderAttach = (Attach) transport.writes.get(3);
+ validateAttach(senderAttach, false, linkName, senderHandle);
+
+ // Give the necessary response to attach for sender
+ Attach senderResponseAttach = new Attach();
+ senderResponseAttach.setHandle(senderHandle);
+ senderResponseAttach.setRole(Role.RECEIVER); // we send, so peer response is receiver role
+ senderResponseAttach.setName(linkName);
+ senderResponseAttach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+ transport.handleFrame(new TransportFrame(0, senderResponseAttach, null));
+
+ // Verify the local link state and events were updated as expected
+
+ assertEndpointState(sender, EndpointState.ACTIVE, EndpointState.ACTIVE);
+
+ Event senderRemoteOpenEvent = collector.peek();
+ assertSame("Unexpected event context", sender, senderRemoteOpenEvent.getContext());
+ assertEquals(Event.Type.LINK_REMOTE_OPEN, senderRemoteOpenEvent.getEventType());
+ assertEvents(collector, Event.Type.LINK_REMOTE_OPEN);
+
+ // Use the opened sender and receiver a bit
+ exerciseOpenedSenderAndReceiver(transport, collector, session, sender, senderHandle, receiver, receiverHandle);
+ }
+
+ private void validateAttach(Attach attach, boolean isReceiver, String linkName, UnsignedInteger handle)
+ {
+ Role role = attach.getRole();
+ assertEquals(isReceiver ? Role.RECEIVER : Role.SENDER, role);
+ assertEquals(isReceiver, role.getValue());
+ assertEquals(linkName, attach.getName());
+ assertEquals(handle, attach.getHandle());
+ }
+
+ protected void assertEndpointState(Endpoint endpoint, EndpointState localState, EndpointState remoteState)
+ {
+ assertEquals(localState, endpoint.getLocalState());
+ assertEquals(remoteState, endpoint.getRemoteState());
+ }
+
+ private void exerciseOpenedSenderAndReceiver(MockTransportImpl transport, Collector collector, Session session,
+ Sender sender, UnsignedInteger senderHandle, Receiver receiver, UnsignedInteger receiverHandle)
+ {
+ // Send a delivery
+ Flow flow = new Flow();
+ flow.setHandle(senderHandle);
+ flow.setDeliveryCount(UnsignedInteger.ZERO);
+ flow.setNextIncomingId(UnsignedInteger.ONE);
+ flow.setNextOutgoingId(UnsignedInteger.ZERO);
+ flow.setIncomingWindow(UnsignedInteger.valueOf(1024));
+ flow.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+ flow.setLinkCredit(UnsignedInteger.valueOf(5));
+
+ transport.handleFrame(new TransportFrame(0, flow, null));
+
+ Event senderFlowEvent = collector.peek();
+ assertEquals(Event.Type.LINK_FLOW, senderFlowEvent.getEventType());
+ assertSame("Unexpected event context", sender, senderFlowEvent.getContext());
+ assertEvents(collector, Event.Type.LINK_FLOW);
+
+ String sentDeliveryTag = "sendTag";
+ sendMessage(sender, sentDeliveryTag, "content");
+ assertEvents(collector, Event.Type.TRANSPORT);
+
+ pumpMockTransport(transport);
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 5, transport.writes.size());
+ assertTrue("Unexpected frame type", transport.writes.get(4) instanceof Transfer);
+ Transfer transfer = (Transfer) transport.writes.get(4);
+ assertEquals(senderHandle, transfer.getHandle());
+ assertEquals(UnsignedInteger.ZERO, transfer.getDeliveryId());
+ assertEquals(new Binary(sentDeliveryTag.getBytes(StandardCharsets.UTF_8)), transfer.getDeliveryTag());
+ assertEquals(false, transfer.getMore());
+
+ assertNoEvents(collector);
+
+ // Receive a delivery
+ int credits = 10;
+ receiver.flow(credits);
+ assertEvents(collector, Event.Type.TRANSPORT);
+
+ pumpMockTransport(transport);
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 6, transport.writes.size());
+ assertTrue("Unexpected frame type", transport.writes.get(5) instanceof Flow);
+ Flow sentFlow = (Flow) transport.writes.get(5);
+ assertEquals(receiverHandle, sentFlow.getHandle());
+ assertEquals(UnsignedInteger.valueOf(credits), sentFlow.getLinkCredit());
+ assertEquals(UnsignedInteger.ZERO, sentFlow.getDeliveryCount());
+
+ assertNoEvents(collector);
+ String recievedDeliveryTag = "recvTag";
+ handleTransfer(transport, receiverHandle, 1, recievedDeliveryTag, new byte[]{ 1, 2, 3 });
+ assertEvents(collector, Event.Type.DELIVERY);
+
+ assertEquals("Unexpected queued count", 1, receiver.getQueued());
+ assertEquals("Unexpected credit", 10, receiver.getCredit());
+ assertEquals("Unexpected incoming bytes count", 3, session.getIncomingBytes());
+
+ verifyDeliveryRawPayload(receiver, recievedDeliveryTag, new byte[] { 1, 2, 3 });
+
+ assertEquals("Unexpected queued count", 0, receiver.getQueued());
+ assertEquals("Unexpected credit", 9, receiver.getCredit());
+ assertEquals("Unexpected incoming bytes count", 0, session.getIncomingBytes());
+
+ assertNoEvents(collector);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org