You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2020/10/14 15:28:20 UTC

[qpid-proton-j] branch master updated: PROTON-2284: track half-open sender and receiver links seperately to resolve upon peer attach

This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton-j.git


The following commit(s) were added to refs/heads/master by this push:
     new 754d40e  PROTON-2284: track half-open sender and receiver links seperately to resolve upon peer attach
754d40e is described below

commit 754d40e2a2e58aab7a252d933daef6d572f9f508
Author: Robbie Gemmell <ro...@apache.org>
AuthorDate: Wed Oct 14 16:21:47 2020 +0100

    PROTON-2284: track half-open sender and receiver links seperately to resolve upon peer attach
---
 .../qpid/proton/engine/impl/TransportImpl.java     |   8 +-
 .../qpid/proton/engine/impl/TransportSession.java  |  25 +-
 .../qpid/proton/engine/impl/TransportImplTest.java | 304 ++++++++++++++++++++-
 3 files changed, 328 insertions(+), 9 deletions(-)

diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
index 7307aff..aa30496 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
@@ -800,11 +800,12 @@ public class TransportImpl extends EndpointImpl
                             && !transportLink.isLocalHandleSet()) || link.getRemoteState() == EndpointState.UNINITIALIZED)
                         {
 
+                            Role role = endpoint instanceof ReceiverImpl ? Role.RECEIVER : Role.SENDER;
                             UnsignedInteger localHandle = transportSession.allocateLocalHandle(transportLink);
 
                             if(link.getRemoteState() == EndpointState.UNINITIALIZED)
                             {
-                                transportSession.addHalfOpenLink(transportLink);
+                                transportSession.addHalfOpenLink(transportLink, Role.SENDER == role);
                             }
 
                             Attach attach = new Attach();
@@ -851,7 +852,7 @@ public class TransportImpl extends EndpointImpl
                                 attach.setMaxMessageSize(link.getMaxMessageSize());
                             }
 
-                            attach.setRole(endpoint instanceof ReceiverImpl ? Role.RECEIVER : Role.SENDER);
+                            attach.setRole(role);
 
                             if(link instanceof SenderImpl)
                             {
@@ -1268,7 +1269,8 @@ public class TransportImpl extends EndpointImpl
             }
             else
             {
-                transportLink = transportSession.resolveHalfOpenLink(attach.getName());
+                // We flip the peer role to determine our local role, and try to resolve an existing half-open link
+                transportLink = transportSession.resolveHalfOpenLink(attach.getName(), Role.RECEIVER == attach.getRole());
                 if(transportLink == null)
                 {
 
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
index e473675..0bab291 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
@@ -54,7 +54,8 @@ class TransportSession
 
     private final Map<UnsignedInteger, TransportLink<?>> _remoteHandlesMap = new HashMap<UnsignedInteger, TransportLink<?>>();
     private final Map<UnsignedInteger, TransportLink<?>> _localHandlesMap = new HashMap<UnsignedInteger, TransportLink<?>>();
-    private final Map<String, TransportLink> _halfOpenLinks = new HashMap<String, TransportLink>();
+    private final Map<String, TransportLink> _halfOpenSenderLinks = new HashMap<String, TransportLink>();
+    private final Map<String, TransportLink> _halfOpenReceiverLinks = new HashMap<String, TransportLink>();
 
 
     private UnsignedInteger _incomingDeliveryId = null;
@@ -251,14 +252,28 @@ class TransportSession
         _remoteHandlesMap.remove(handle);
     }
 
-    public TransportLink resolveHalfOpenLink(String name)
+    public TransportLink resolveHalfOpenLink(String name, boolean isSender)
     {
-        return _halfOpenLinks.remove(name);
+        if(isSender)
+        {
+            return _halfOpenSenderLinks.remove(name);
+        }
+        else
+        {
+            return _halfOpenReceiverLinks.remove(name);
+        }
     }
 
-    public void addHalfOpenLink(TransportLink link)
+    public void addHalfOpenLink(TransportLink link, boolean isSender)
     {
-        _halfOpenLinks.put(link.getName(), link);
+        if(isSender)
+        {
+            _halfOpenSenderLinks.put(link.getName(), link);
+        }
+        else
+        {
+            _halfOpenReceiverLinks.put(link.getName(), link);
+        }
     }
 
     public void handleTransfer(Transfer transfer, Binary payload)
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
index 5217421..a40da89 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.eq;
@@ -67,6 +68,7 @@ import org.apache.qpid.proton.codec.ReadableBuffer;
 import org.apache.qpid.proton.engine.Collector;
 import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Endpoint;
 import org.apache.qpid.proton.engine.EndpointState;
 import org.apache.qpid.proton.engine.Event;
 import org.apache.qpid.proton.engine.Link;
@@ -2973,6 +2975,11 @@ public class TransportImplTest
         assertEquals("Unexpected delivery id", ((Disposition) frame).getLast(), UnsignedInteger.valueOf(2));
     }
 
+    private void handleTransfer(TransportImpl transport, UnsignedInteger handle, int deliveryId, String deliveryTag, byte[] payload)
+    {
+        handlePartialTransfer(transport, handle, UnsignedInteger.valueOf(deliveryId), deliveryTag, payload, false);
+    }
+
     private void handlePartialTransfer(TransportImpl transport, UnsignedInteger handle, int deliveryId, String deliveryTag, byte[] partialPayload, boolean more)
     {
         handlePartialTransfer(transport, handle, UnsignedInteger.valueOf(deliveryId), deliveryTag, partialPayload, more);
@@ -4143,7 +4150,7 @@ public class TransportImplTest
 
         Attach attach = new Attach();
         attach.setHandle(UnsignedInteger.ZERO);
-        attach.setRole(Role.SENDER);
+        attach.setRole(Role.RECEIVER);
         attach.setName(linkName);
         attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
         transport.handleFrame(new TransportFrame(0, attach, null));
@@ -4590,4 +4597,299 @@ public class TransportImplTest
         assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 6, transport.writes.size());
         assertTrue("Unexpected frame type", transport.writes.get(5) instanceof Close);
     }
+
+    @Test
+    public void testParallelOpenOfSenderAndReceiverLinksWithSameName()
+    {
+        MockTransportImpl transport = new MockTransportImpl();
+        transport.setEmitFlowEventOnSend(false);
+        Connection connection = Proton.connection();
+        transport.bind(connection);
+
+        Collector collector = Collector.Factory.create();
+        connection.collect(collector);
+
+        connection.open();
+
+        Session session = connection.session();
+        session.open();
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 2, transport.writes.size());
+
+        assertEvents(collector, Event.Type.CONNECTION_INIT, Event.Type.CONNECTION_LOCAL_OPEN, Event.Type.TRANSPORT,
+                Event.Type.SESSION_INIT, Event.Type.SESSION_LOCAL_OPEN, Event.Type.TRANSPORT);
+
+        // Give the necessary responses to open/begin
+        transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+        Begin begin = new Begin();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+        begin.setNextOutgoingId(UnsignedInteger.ONE);
+        begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        transport.handleFrame(new TransportFrame(0, begin, null));
+
+        final UnsignedInteger receiverHandle = UnsignedInteger.ZERO;
+        final UnsignedInteger senderHandle = UnsignedInteger.ONE;
+
+        // Open a receiver and a sender with same link name in parallel, i.e send both
+        // attaches, such that both are locally open but neither is yet remotely open.
+        String linkName = "myLinkName";
+        Receiver receiver = session.receiver(linkName);
+        assertEndpointState(receiver, EndpointState.UNINITIALIZED, EndpointState.UNINITIALIZED);
+
+        Sender sender = session.sender(linkName);
+        assertEndpointState(sender, EndpointState.UNINITIALIZED, EndpointState.UNINITIALIZED);
+
+        receiver.open();
+        sender.open();
+
+        assertEndpointState(receiver, EndpointState.ACTIVE, EndpointState.UNINITIALIZED);
+        assertEndpointState(sender, EndpointState.ACTIVE, EndpointState.UNINITIALIZED);
+
+        assertEvents(collector, Event.Type.CONNECTION_REMOTE_OPEN, Event.Type.SESSION_REMOTE_OPEN,
+                Event.Type.LINK_INIT, Event.Type.LINK_INIT, Event.Type.LINK_LOCAL_OPEN, Event.Type.TRANSPORT,
+                Event.Type.LINK_LOCAL_OPEN, Event.Type.TRANSPORT);
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
+
+        assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach);
+        Attach receiverAttach = (Attach) transport.writes.get(2);
+        validateAttach(receiverAttach, true, linkName, receiverHandle);
+
+        assertTrue("Unexpected frame type", transport.writes.get(3) instanceof Attach);
+        Attach senderAttach = (Attach) transport.writes.get(3);
+        validateAttach(senderAttach, false, linkName, senderHandle);
+
+        // Give the necessary response to attach for receiver
+        Attach recieverResponseAttach = new Attach();
+        recieverResponseAttach.setHandle(receiverHandle);
+        recieverResponseAttach.setRole(Role.SENDER); // we receive, so peer response is sender role
+        recieverResponseAttach.setName(linkName);
+        recieverResponseAttach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, recieverResponseAttach, null));
+
+        // Verify the local link state and events were updated as expected
+        assertEndpointState(receiver, EndpointState.ACTIVE, EndpointState.ACTIVE);
+
+        Event recieverRemoteOpenEvent = collector.peek();
+        assertSame("Unexpected event context", receiver, recieverRemoteOpenEvent.getContext());
+        assertEquals(Event.Type.LINK_REMOTE_OPEN, recieverRemoteOpenEvent.getEventType());
+        assertEvents(collector, Event.Type.LINK_REMOTE_OPEN);
+
+        // Give the necessary response to attach for sender
+        Attach senderResponseAttach = new Attach();
+        senderResponseAttach.setHandle(senderHandle);
+        senderResponseAttach.setRole(Role.RECEIVER); // we send, so peer response is receiver role
+        senderResponseAttach.setName(linkName);
+        senderResponseAttach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, senderResponseAttach, null));
+
+        // Verify the local link state and events were updated as expected
+
+        assertEndpointState(sender, EndpointState.ACTIVE, EndpointState.ACTIVE);
+
+        Event senderRemoteOpenEvent = collector.peek();
+        assertSame("Unexpected event context", sender, senderRemoteOpenEvent.getContext());
+        assertEquals(Event.Type.LINK_REMOTE_OPEN, senderRemoteOpenEvent.getEventType());
+        assertEvents(collector, Event.Type.LINK_REMOTE_OPEN);
+
+        // Use the opened sender and receiver a bit
+        exerciseOpenedSenderAndReceiver(transport, collector, session, sender, senderHandle, receiver, receiverHandle);
+    }
+
+    @Test
+    public void testSequentialOpenOfSenderAndReceiverLinksWithSameName() {
+        MockTransportImpl transport = new MockTransportImpl();
+        transport.setEmitFlowEventOnSend(false);
+        Connection connection = Proton.connection();
+        transport.bind(connection);
+
+        Collector collector = Collector.Factory.create();
+        connection.collect(collector);
+
+        connection.open();
+
+        Session session = connection.session();
+        session.open();
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 2, transport.writes.size());
+
+        assertEvents(collector, Event.Type.CONNECTION_INIT, Event.Type.CONNECTION_LOCAL_OPEN, Event.Type.TRANSPORT,
+                Event.Type.SESSION_INIT, Event.Type.SESSION_LOCAL_OPEN, Event.Type.TRANSPORT);
+
+        // Give the necessary responses to open/begin
+        transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+        Begin begin = new Begin();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+        begin.setNextOutgoingId(UnsignedInteger.ONE);
+        begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        transport.handleFrame(new TransportFrame(0, begin, null));
+
+        final UnsignedInteger receiverHandle = UnsignedInteger.ZERO;
+        final UnsignedInteger senderHandle = UnsignedInteger.ONE;
+
+        // Open a receiver
+        String linkName = "myLinkName";
+        Receiver receiver = session.receiver(linkName);
+        assertEndpointState(receiver, EndpointState.UNINITIALIZED, EndpointState.UNINITIALIZED);
+        receiver.open();
+
+        assertEndpointState(receiver, EndpointState.ACTIVE, EndpointState.UNINITIALIZED);
+
+        assertEvents(collector, Event.Type.CONNECTION_REMOTE_OPEN, Event.Type.SESSION_REMOTE_OPEN,
+                Event.Type.LINK_INIT, Event.Type.LINK_LOCAL_OPEN, Event.Type.TRANSPORT);
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
+
+        assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach);
+        Attach receiverAttach = (Attach) transport.writes.get(2);
+        validateAttach(receiverAttach, true, linkName, receiverHandle);
+
+        // Give the necessary response to attach for receiver
+        Attach recieverResponseAttach = new Attach();
+        recieverResponseAttach.setHandle(receiverHandle);
+        recieverResponseAttach.setRole(Role.SENDER); // we receive, so peer response is sender role
+        recieverResponseAttach.setName(linkName);
+        recieverResponseAttach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, recieverResponseAttach, null));
+
+        // Verify the local link state and events were updated as expected
+        assertEndpointState(receiver, EndpointState.ACTIVE, EndpointState.ACTIVE);
+
+        Event recieverRemoteOpenEvent = collector.peek();
+        assertSame("Unexpected event context", receiver, recieverRemoteOpenEvent.getContext());
+        assertEquals(Event.Type.LINK_REMOTE_OPEN, recieverRemoteOpenEvent.getEventType());
+        assertEvents(collector, Event.Type.LINK_REMOTE_OPEN);
+
+        // Open a sender, with same link name
+        Sender sender = session.sender(linkName);
+        assertEndpointState(sender, EndpointState.UNINITIALIZED, EndpointState.UNINITIALIZED);
+        sender.open();
+
+        assertEndpointState(sender, EndpointState.ACTIVE, EndpointState.UNINITIALIZED);
+
+        assertEvents(collector, Event.Type.LINK_INIT, Event.Type.LINK_LOCAL_OPEN, Event.Type.TRANSPORT);
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
+
+        assertTrue("Unexpected frame type", transport.writes.get(3) instanceof Attach);
+        Attach senderAttach = (Attach) transport.writes.get(3);
+        validateAttach(senderAttach, false, linkName, senderHandle);
+
+        // Give the necessary response to attach for sender
+        Attach senderResponseAttach = new Attach();
+        senderResponseAttach.setHandle(senderHandle);
+        senderResponseAttach.setRole(Role.RECEIVER); // we send, so peer response is receiver role
+        senderResponseAttach.setName(linkName);
+        senderResponseAttach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, senderResponseAttach, null));
+
+        // Verify the local link state and events were updated as expected
+
+        assertEndpointState(sender, EndpointState.ACTIVE, EndpointState.ACTIVE);
+
+        Event senderRemoteOpenEvent = collector.peek();
+        assertSame("Unexpected event context", sender, senderRemoteOpenEvent.getContext());
+        assertEquals(Event.Type.LINK_REMOTE_OPEN, senderRemoteOpenEvent.getEventType());
+        assertEvents(collector, Event.Type.LINK_REMOTE_OPEN);
+
+        // Use the opened sender and receiver a bit
+        exerciseOpenedSenderAndReceiver(transport, collector, session, sender, senderHandle, receiver, receiverHandle);
+    }
+
+    private void validateAttach(Attach attach, boolean isReceiver, String linkName, UnsignedInteger handle)
+    {
+        Role role = attach.getRole();
+        assertEquals(isReceiver ? Role.RECEIVER : Role.SENDER, role);
+        assertEquals(isReceiver, role.getValue());
+        assertEquals(linkName, attach.getName());
+        assertEquals(handle, attach.getHandle());
+    }
+
+    protected void assertEndpointState(Endpoint endpoint, EndpointState localState, EndpointState remoteState)
+    {
+        assertEquals(localState, endpoint.getLocalState());
+        assertEquals(remoteState, endpoint.getRemoteState());
+    }
+
+    private void exerciseOpenedSenderAndReceiver(MockTransportImpl transport, Collector collector, Session session,
+                                                 Sender sender, UnsignedInteger senderHandle, Receiver receiver, UnsignedInteger receiverHandle)
+    {
+        // Send a delivery
+        Flow flow = new Flow();
+        flow.setHandle(senderHandle);
+        flow.setDeliveryCount(UnsignedInteger.ZERO);
+        flow.setNextIncomingId(UnsignedInteger.ONE);
+        flow.setNextOutgoingId(UnsignedInteger.ZERO);
+        flow.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        flow.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        flow.setLinkCredit(UnsignedInteger.valueOf(5));
+
+        transport.handleFrame(new TransportFrame(0, flow, null));
+
+        Event senderFlowEvent = collector.peek();
+        assertEquals(Event.Type.LINK_FLOW, senderFlowEvent.getEventType());
+        assertSame("Unexpected event context", sender, senderFlowEvent.getContext());
+        assertEvents(collector, Event.Type.LINK_FLOW);
+
+        String sentDeliveryTag = "sendTag";
+        sendMessage(sender, sentDeliveryTag, "content");
+        assertEvents(collector, Event.Type.TRANSPORT);
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 5, transport.writes.size());
+        assertTrue("Unexpected frame type", transport.writes.get(4) instanceof Transfer);
+        Transfer transfer = (Transfer) transport.writes.get(4);
+        assertEquals(senderHandle, transfer.getHandle());
+        assertEquals(UnsignedInteger.ZERO, transfer.getDeliveryId());
+        assertEquals(new Binary(sentDeliveryTag.getBytes(StandardCharsets.UTF_8)), transfer.getDeliveryTag());
+        assertEquals(false, transfer.getMore());
+
+        assertNoEvents(collector);
+
+        // Receive a delivery
+        int credits = 10;
+        receiver.flow(credits);
+        assertEvents(collector, Event.Type.TRANSPORT);
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 6, transport.writes.size());
+        assertTrue("Unexpected frame type", transport.writes.get(5) instanceof Flow);
+        Flow sentFlow = (Flow) transport.writes.get(5);
+        assertEquals(receiverHandle, sentFlow.getHandle());
+        assertEquals(UnsignedInteger.valueOf(credits), sentFlow.getLinkCredit());
+        assertEquals(UnsignedInteger.ZERO, sentFlow.getDeliveryCount());
+
+        assertNoEvents(collector);
+        String recievedDeliveryTag = "recvTag";
+        handleTransfer(transport, receiverHandle, 1, recievedDeliveryTag, new byte[]{ 1, 2, 3 });
+        assertEvents(collector, Event.Type.DELIVERY);
+
+        assertEquals("Unexpected queued count", 1, receiver.getQueued());
+        assertEquals("Unexpected credit", 10, receiver.getCredit());
+        assertEquals("Unexpected incoming bytes count", 3, session.getIncomingBytes());
+
+        verifyDeliveryRawPayload(receiver, recievedDeliveryTag, new byte[] { 1, 2, 3 });
+
+        assertEquals("Unexpected queued count", 0, receiver.getQueued());
+        assertEquals("Unexpected credit", 9, receiver.getCredit());
+        assertEquals("Unexpected incoming bytes count", 0, session.getIncomingBytes());
+
+        assertNoEvents(collector);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org