You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2018/07/31 10:04:59 UTC

[1/2] qpid-proton-j git commit: PROTON-1901: fix handling of multiplexed incoming deliveries on a session, check for various sequencing issues that would cause illegal states and knock on problems

Repository: qpid-proton-j
Updated Branches:
  refs/heads/0.27.x e9db73daa -> 1f1408a30


PROTON-1901: fix handling of multiplexed incoming deliveries on a session, check for various sequencing issues that would cause illegal states and knock on problems

(cherry picked from commit 2880325e013d9ef675fe61ea254d65a615558d15)


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/commit/40edf351
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/tree/40edf351
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/diff/40edf351

Branch: refs/heads/0.27.x
Commit: 40edf3516451a10a2dada3cd10687b7e6b19d6be
Parents: e9db73d
Author: Robbie Gemmell <ro...@apache.org>
Authored: Fri Jul 27 12:46:14 2018 +0100
Committer: Robbie Gemmell <ro...@apache.org>
Committed: Mon Jul 30 16:04:52 2018 +0100

----------------------------------------------------------------------
 .../proton/engine/impl/TransportReceiver.java   |  10 +-
 .../proton/engine/impl/TransportSession.java    |  51 +-
 .../proton/engine/impl/TransportImplTest.java   | 605 ++++++++++++++++++-
 3 files changed, 647 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/40edf351/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportReceiver.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportReceiver.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportReceiver.java
index 29d97c4..6e588d4 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportReceiver.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportReceiver.java
@@ -21,11 +21,12 @@
 
 package org.apache.qpid.proton.engine.impl;
 
+import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.apache.qpid.proton.amqp.transport.Flow;
 
 class TransportReceiver extends TransportLink<ReceiverImpl>
 {
-
+    private UnsignedInteger _incomingDeliveryId;
 
     TransportReceiver(ReceiverImpl link)
     {
@@ -52,7 +53,14 @@ class TransportReceiver extends TransportLink<ReceiverImpl>
             setDeliveryCount(getRemoteDeliveryCount());
             getLink().setDrained(getLink().getDrained() + delta);
         }
+    }
 
+    UnsignedInteger getIncomingDeliveryId() {
+        return _incomingDeliveryId;
+    }
 
+    void setIncomingDeliveryId(UnsignedInteger _incomingDeliveryId) {
+        this._incomingDeliveryId = _incomingDeliveryId;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/40edf351/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
index bbacd30..567e8eb 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
@@ -264,22 +264,23 @@ class TransportSession
     public void handleTransfer(Transfer transfer, Binary payload)
     {
         DeliveryImpl delivery;
-        incrementNextIncomingId();
-        if(transfer.getDeliveryId() == null || transfer.getDeliveryId().equals(_incomingDeliveryId))
+        incrementNextIncomingId(); // The conceptual/non-wire transfer-id, for the session window.
+
+        TransportReceiver transportReceiver = (TransportReceiver) getLinkFromRemoteHandle(transfer.getHandle());
+        UnsignedInteger linkIncomingDeliveryId = transportReceiver.getIncomingDeliveryId();
+        UnsignedInteger deliveryId = transfer.getDeliveryId();
+
+        if(linkIncomingDeliveryId != null && (linkIncomingDeliveryId.equals(deliveryId) || deliveryId == null))
         {
-            TransportReceiver transportReceiver = (TransportReceiver) getLinkFromRemoteHandle(transfer.getHandle());
-            ReceiverImpl receiver = transportReceiver.getReceiver();
-            Binary deliveryTag = transfer.getDeliveryTag();
-            delivery = _unsettledIncomingDeliveriesById.get(_incomingDeliveryId);
+            delivery = _unsettledIncomingDeliveriesById.get(linkIncomingDeliveryId);
             delivery.getTransportDelivery().incrementSessionSize();
-
         }
         else
         {
-            // TODO - check deliveryId has been incremented by one
-            _incomingDeliveryId = transfer.getDeliveryId();
-            // TODO - check link handle valid and a receiver
-            TransportReceiver transportReceiver = (TransportReceiver) getLinkFromRemoteHandle(transfer.getHandle());
+            verifyNewDeliveryIdSequence(_incomingDeliveryId, linkIncomingDeliveryId, deliveryId);
+
+            _incomingDeliveryId = deliveryId;
+
             ReceiverImpl receiver = transportReceiver.getReceiver();
             Binary deliveryTag = transfer.getDeliveryTag();
             delivery = receiver.delivery(deliveryTag.getArray(), deliveryTag.getArrayOffset(),
@@ -288,11 +289,13 @@ class TransportSession
             if(messageFormat != null) {
                 delivery.setMessageFormat(messageFormat.intValue());
             }
-            TransportDelivery transportDelivery = new TransportDelivery(_incomingDeliveryId, delivery, transportReceiver);
+            TransportDelivery transportDelivery = new TransportDelivery(deliveryId, delivery, transportReceiver);
             delivery.setTransportDelivery(transportDelivery);
-            _unsettledIncomingDeliveriesById.put(_incomingDeliveryId, delivery);
+            transportReceiver.setIncomingDeliveryId(deliveryId);
+            _unsettledIncomingDeliveriesById.put(deliveryId, delivery);
             getSession().incrementIncomingDeliveries(1);
         }
+
         if( transfer.getState()!=null )
         {
             delivery.setRemoteDeliveryState(transfer.getState());
@@ -307,12 +310,18 @@ class TransportSession
 
         delivery.updateWork();
 
+        if(!transfer.getMore() || transfer.getAborted())
+        {
+            transportReceiver.setIncomingDeliveryId(null);
+        }
+
         if(!(transfer.getMore() || transfer.getAborted()))
         {
             delivery.setComplete();
             delivery.getLink().getTransportLink().decrementLinkCredit();
             delivery.getLink().getTransportLink().incrementDeliveryCount();
         }
+
         if(Boolean.TRUE.equals(transfer.getSettled()))
         {
             delivery.setRemoteSettled(true);
@@ -328,6 +337,22 @@ class TransportSession
         getSession().getConnection().put(Event.Type.DELIVERY, delivery);
     }
 
+    private void verifyNewDeliveryIdSequence(UnsignedInteger previousId, UnsignedInteger linkIncomingId, UnsignedInteger newDeliveryId) {
+        if(newDeliveryId == null) {
+            throw new IllegalStateException("No delivery-id specified on first Transfer of new delivery");
+        }
+
+        // Doing a primitive comparison, uses intValue() since its a uint sequence
+        // and we need the primitive values to wrap appropriately during comparison.
+        if(previousId != null && previousId.intValue() + 1 != newDeliveryId.intValue()) {
+            throw new IllegalStateException("Expected delivery-id " + previousId.add(UnsignedInteger.ONE) + ", got " + newDeliveryId);
+        }
+
+        if(linkIncomingId != null) {
+            throw new IllegalStateException("Illegal multiplex of deliveries on same link with delivery-id " + linkIncomingId + " and " + newDeliveryId);
+        }
+    }
+
     public void freeLocalChannel()
     {
         unsetLocalChannel();

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/40edf351/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
----------------------------------------------------------------------
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
index 9e3afd6..d66964b 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
@@ -40,12 +40,14 @@ import org.apache.qpid.proton.Proton;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.apache.qpid.proton.amqp.UnsignedShort;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.messaging.Released;
 import org.apache.qpid.proton.amqp.transport.Attach;
 import org.apache.qpid.proton.amqp.transport.Begin;
 import org.apache.qpid.proton.amqp.transport.Close;
 import org.apache.qpid.proton.amqp.transport.Detach;
+import org.apache.qpid.proton.amqp.transport.Disposition;
 import org.apache.qpid.proton.amqp.transport.End;
 import org.apache.qpid.proton.amqp.transport.Flow;
 import org.apache.qpid.proton.amqp.transport.FrameBody;
@@ -1712,6 +1714,28 @@ public class TransportImplTest
         return delivery;
     }
 
+    private Delivery verifyDeliveryRawPayload(Receiver receiver, String deliveryTag, byte[] payload)
+    {
+        Delivery delivery = receiver.current();
+
+        assertTrue(Arrays.equals(deliveryTag.getBytes(StandardCharsets.UTF_8), delivery.getTag()));
+
+        assertFalse(delivery.isPartial());
+        assertTrue(delivery.isReadable());
+
+        byte[] received = new byte[delivery.pending()];
+        int len = receiver.recv(received, 0, BUFFER_SIZE);
+
+        assertEquals("unexpected length", len, received.length);
+
+        assertArrayEquals("Received delivery payload not as expected", payload, received);
+
+        boolean receiverAdvanced = receiver.advance();
+        assertTrue("receiver has not advanced", receiverAdvanced);
+
+        return delivery;
+    }
+
     /**
      * Verify that the {@link TransportInternal#addTransportLayer(TransportLayer)} has the desired
      * effect by observing the wrapping effect on related transport input and output methods.
@@ -2645,16 +2669,16 @@ public class TransportImplTest
     }
 
     @Test
-    public void testMultiplexMultiFrameDeliveryOnSingleSession() {
-        doMultiplexMultiFrameDeliveryOnSingleSessionTestImpl(false);
+    public void testMultiplexMultiFrameDeliveryOnSingleSessionOutgoing() {
+        doMultiplexMultiFrameDeliveryOnSingleSessionOutgoingTestImpl(false);
     }
 
     @Test
-    public void testMultiplexMultiFrameDeliveriesOnSingleSession() {
-        doMultiplexMultiFrameDeliveryOnSingleSessionTestImpl(true);
+    public void testMultiplexMultiFrameDeliveriesOnSingleSessionOutgoing() {
+        doMultiplexMultiFrameDeliveryOnSingleSessionOutgoingTestImpl(true);
     }
 
-    private void doMultiplexMultiFrameDeliveryOnSingleSessionTestImpl(boolean bothDeliveriesMultiFrame) {
+    private void doMultiplexMultiFrameDeliveryOnSingleSessionOutgoingTestImpl(boolean bothDeliveriesMultiFrame) {
         MockTransportImpl transport = new MockTransportImpl();
         transport.setEmitFlowEventOnSend(false);
 
@@ -2788,4 +2812,575 @@ public class TransportImplTest
             assertEquals("Unexpected more flag", false, transfer.getMore());
         }
     }
+
+    @Test
+    public void testMultiplexMultiFrameDeliveriesOnSingleSessionIncoming() {
+        doMultiplexMultiFrameDeliveryOnSingleSessionIncomingTestImpl(true);
+    }
+
+    @Test
+    public void testMultiplexMultiFrameDeliveryOnSingleSessionIncoming() {
+        doMultiplexMultiFrameDeliveryOnSingleSessionIncomingTestImpl(false);
+    }
+
+    private void doMultiplexMultiFrameDeliveryOnSingleSessionIncomingTestImpl(boolean bothDeliveriesMultiFrame) {
+        int contentLength1 = 7000;
+        int maxPayloadChunkSize = 2000;
+        int contentLength2 = 1000;
+        if(bothDeliveriesMultiFrame) {
+            contentLength2 = 4100;
+        }
+
+        MockTransportImpl transport = new MockTransportImpl();
+        transport.setEmitFlowEventOnSend(false);
+        Connection connection = Proton.connection();
+        transport.bind(connection);
+
+        connection.open();
+
+        Session session = connection.session();
+        session.open();
+
+        String linkName1 = "myReceiver1";
+        Receiver receiver1 = session.receiver(linkName1);
+        receiver1.flow(5);
+        receiver1.open();
+
+        String linkName2 = "myReceiver2";
+        Receiver receiver2 = session.receiver(linkName2);
+        receiver2.flow(5);
+        receiver2.open();
+
+        pumpMockTransport(transport);
+
+        final UnsignedInteger r1handle = UnsignedInteger.ZERO;
+        final UnsignedInteger r2handle = UnsignedInteger.ONE;
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 6, transport.writes.size());
+
+        assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
+        assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
+        FrameBody frame = transport.writes.get(2);
+        assertTrue("Unexpected frame type", frame instanceof Attach);
+        assertEquals("Unexpected handle", ((Attach) frame).getHandle(), r1handle);
+        frame = transport.writes.get(3);
+        assertTrue("Unexpected frame type", frame instanceof Attach);
+        assertEquals("Unexpected handle", ((Attach) frame).getHandle(), r2handle);
+        frame = transport.writes.get(4);
+        assertTrue("Unexpected frame type", frame instanceof Flow);
+        assertEquals("Unexpected handle", ((Flow) frame).getHandle(), r1handle);
+        frame = transport.writes.get(5);
+        assertTrue("Unexpected frame type", frame instanceof Flow);
+        assertEquals("Unexpected handle", ((Flow) frame).getHandle(), r2handle);
+
+        assertNull("Should not yet have a delivery", receiver1.current());
+        assertNull("Should not yet have a delivery", receiver2.current());
+
+        // Send the necessary responses to open/begin/attach
+        transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+        Begin begin = new Begin();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+        begin.setNextOutgoingId(UnsignedInteger.ONE);
+        begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        transport.handleFrame(new TransportFrame(0, begin, null));
+
+        Attach attach1 = new Attach();
+        attach1.setHandle(r1handle);
+        attach1.setRole(Role.SENDER);
+        attach1.setName(linkName1);
+        attach1.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, attach1, null));
+
+        Attach attach2 = new Attach();
+        attach2.setHandle(r2handle);
+        attach2.setRole(Role.SENDER);
+        attach2.setName(linkName2);
+        attach2.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, attach2, null));
+
+        String deliveryTag1 = "tag1";
+        String messageContent1 = createLargeContent(contentLength1);
+        String deliveryTag2 = "tag2";
+        String messageContent2 = createLargeContent(contentLength2);
+
+        ArrayList<byte[]> message1chunks = createTransferPayloads(messageContent1, maxPayloadChunkSize);
+        assertEquals("unexpected number of payload chunks", 4, message1chunks.size());
+        ArrayList<byte[]> message2chunks = createTransferPayloads(messageContent2, maxPayloadChunkSize);
+        if(bothDeliveriesMultiFrame) {
+            assertEquals("unexpected number of payload chunks", 3, message2chunks.size());
+        } else {
+            assertEquals("unexpected number of payload chunks", 1, message2chunks.size());
+        }
+
+        while (true) {
+           if (!message1chunks.isEmpty()) {
+              byte[] chunk = message1chunks.remove(0);
+              handlePartialTransfer(transport, r1handle, 1, deliveryTag1, chunk, !message1chunks.isEmpty());
+           }
+
+           if (!message2chunks.isEmpty()) {
+               byte[] chunk = message2chunks.remove(0);
+               handlePartialTransfer(transport, r2handle, 2, deliveryTag2, chunk, !message2chunks.isEmpty());
+            }
+
+           if (message1chunks.isEmpty() && message2chunks.isEmpty()) {
+              break;
+           }
+        }
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 6, transport.writes.size());
+
+        assertEquals("Unexpected queued count", 1, receiver1.getQueued());
+        Delivery delivery1 = verifyDelivery(receiver1, deliveryTag1, messageContent1);
+        assertNotNull("Should now have a delivery", delivery1);
+        assertEquals("Unexpected queued count", 0, receiver1.getQueued());
+
+        assertEquals("Unexpected queued count", 1, receiver2.getQueued());
+        Delivery delivery2 = verifyDelivery(receiver2, deliveryTag2, messageContent2);
+        assertNotNull("Should now have a delivery", delivery2);
+        assertEquals("Unexpected queued count", 0, receiver2.getQueued());
+
+        delivery1.disposition(Accepted.getInstance());
+        delivery1.settle();
+        pumpMockTransport(transport);
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 7, transport.writes.size());
+
+        frame = transport.writes.get(6);
+        assertTrue("Unexpected frame type", frame instanceof Disposition);
+        assertEquals("Unexpected delivery id", ((Disposition) frame).getFirst(), UnsignedInteger.ONE);
+        assertEquals("Unexpected delivery id", ((Disposition) frame).getLast(), UnsignedInteger.ONE);
+
+        delivery2.disposition(Accepted.getInstance());
+        delivery2.settle();
+        pumpMockTransport(transport);
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 8, transport.writes.size());
+
+        frame = transport.writes.get(7);
+        assertTrue("Unexpected frame type", frame instanceof Disposition);
+        assertEquals("Unexpected delivery id", ((Disposition) frame).getFirst(), UnsignedInteger.valueOf(2));
+        assertEquals("Unexpected delivery id", ((Disposition) frame).getLast(), UnsignedInteger.valueOf(2));
+    }
+
+    private void handlePartialTransfer(TransportImpl transport, UnsignedInteger handle, int deliveryId, String deliveryTag, byte[] partialPayload, boolean more)
+    {
+        handlePartialTransfer(transport, handle, UnsignedInteger.valueOf(deliveryId), deliveryTag, partialPayload, more);
+    }
+
+    private void handlePartialTransfer(TransportImpl transport, UnsignedInteger handle, UnsignedInteger deliveryId, String deliveryTag, byte[] partialPayload, boolean more)
+    {
+        handlePartialTransfer(transport, handle, deliveryId, deliveryTag, partialPayload, more, false);
+    }
+
+    private void handlePartialTransfer(TransportImpl transport, UnsignedInteger handle, UnsignedInteger deliveryId, String deliveryTag, byte[] partialPayload, boolean more, boolean aborted)
+    {
+        byte[] tag = deliveryTag.getBytes(StandardCharsets.UTF_8);
+
+        Transfer transfer = new Transfer();
+        transfer.setHandle(handle);
+        transfer.setDeliveryTag(new Binary(tag));
+        transfer.setMessageFormat(UnsignedInteger.valueOf(DeliveryImpl.DEFAULT_MESSAGE_FORMAT));
+        transfer.setMore(more);
+        transfer.setAborted(aborted);
+        if(deliveryId != null) {
+            // Can be omitted in continuation frames for a given delivery.
+            transfer.setDeliveryId(deliveryId);
+        }
+
+        transport.handleFrame(new TransportFrame(0, transfer, new Binary(partialPayload, 0, partialPayload.length)));
+    }
+
+    private ArrayList<byte[]> createTransferPayloads(String content, int payloadChunkSize)
+    {
+        ArrayList<byte[]> payloadChunks = new ArrayList<>();
+
+        Message m = Message.Factory.create();
+        m.setBody(new AmqpValue(content));
+
+        byte[] encoded = new byte[BUFFER_SIZE];
+        int len = m.encode(encoded, 0, BUFFER_SIZE);
+        assertTrue("given array was too small", len < BUFFER_SIZE);
+
+        int copied = 0;
+        while(copied < len) {
+            int chunkSize = Math.min(len - copied, payloadChunkSize);
+            byte[] chunk = new byte[chunkSize];
+
+            System.arraycopy(encoded, copied, chunk, 0, chunkSize);
+
+            payloadChunks.add(chunk);
+            copied += chunkSize;
+        }
+
+        assertFalse("no payload chunks to return", payloadChunks.isEmpty());
+
+        return payloadChunks;
+    }
+
+    @Test
+    public void testDeliveryIdOutOfSequenceCausesISE() {
+        MockTransportImpl transport = new MockTransportImpl();
+        transport.setEmitFlowEventOnSend(false);
+        Connection connection = Proton.connection();
+        transport.bind(connection);
+
+        connection.open();
+
+        Session session = connection.session();
+        session.open();
+
+        String linkName1 = "myReceiver1";
+        Receiver receiver1 = session.receiver(linkName1);
+        receiver1.flow(5);
+        receiver1.open();
+
+        String linkName2 = "myReceiver2";
+        Receiver receiver2 = session.receiver(linkName2);
+        receiver2.flow(5);
+        receiver2.open();
+
+        pumpMockTransport(transport);
+
+        final UnsignedInteger r1handle = UnsignedInteger.ZERO;
+        final UnsignedInteger r2handle = UnsignedInteger.ONE;
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 6, transport.writes.size());
+
+        // Give the necessary responses to open/begin/attach
+        transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+        Begin begin = new Begin();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+        begin.setNextOutgoingId(UnsignedInteger.ONE);
+        begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        transport.handleFrame(new TransportFrame(0, begin, null));
+
+        Attach attach1 = new Attach();
+        attach1.setHandle(r1handle);
+        attach1.setRole(Role.SENDER);
+        attach1.setName(linkName1);
+        attach1.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, attach1, null));
+
+        Attach attach2 = new Attach();
+        attach2.setHandle(r2handle);
+        attach2.setRole(Role.SENDER);
+        attach2.setName(linkName2);
+        attach2.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, attach2, null));
+
+        String deliveryTag1 = "tag1";
+        String deliveryTag2 = "tag2";
+
+        handlePartialTransfer(transport, r2handle, 2, deliveryTag2, new byte[]{ 2 }, false);
+        try {
+            handlePartialTransfer(transport, r1handle, 1, deliveryTag1, new byte[]{ 1 }, false);
+            fail("Expected an ISE");
+        } catch(IllegalStateException ise) {
+            // Expected
+            assertTrue("Unexpected exception:" + ise, ise.getMessage().contains("Expected delivery-id 3, got 1"));
+        }
+    }
+
+    @Test
+    public void testDeliveryIdMissingOnInitialTransferCausesISE() {
+        MockTransportImpl transport = new MockTransportImpl();
+        transport.setEmitFlowEventOnSend(false);
+        Connection connection = Proton.connection();
+        transport.bind(connection);
+
+        connection.open();
+
+        Session session = connection.session();
+        session.open();
+
+        String linkName1 = "myReceiver1";
+        Receiver receiver1 = session.receiver(linkName1);
+        receiver1.flow(5);
+        receiver1.open();
+
+        pumpMockTransport(transport);
+
+        final UnsignedInteger r1handle = UnsignedInteger.ZERO;
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
+
+        // Give the necessary responses to open/begin/attach
+        transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+        Begin begin = new Begin();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+        begin.setNextOutgoingId(UnsignedInteger.ONE);
+        begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        transport.handleFrame(new TransportFrame(0, begin, null));
+
+        Attach attach1 = new Attach();
+        attach1.setHandle(r1handle);
+        attach1.setRole(Role.SENDER);
+        attach1.setName(linkName1);
+        attach1.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, attach1, null));
+
+        // Receive a delivery without any delivery-id on the [first] transfer frame, expect it to fail.
+        try {
+            handlePartialTransfer(transport, r1handle, null, "tag1", new byte[]{ 1 }, false);
+            fail("Expected an ISE");
+        } catch(IllegalStateException ise) {
+            // Expected
+            assertEquals("Unexpected message", "No delivery-id specified on first Transfer of new delivery", ise.getMessage());
+        }
+    }
+
+    @Test
+    public void testMultiplexDeliveriesOnSameReceiverLinkCausesISE() {
+        MockTransportImpl transport = new MockTransportImpl();
+        transport.setEmitFlowEventOnSend(false);
+        Connection connection = Proton.connection();
+        transport.bind(connection);
+
+        connection.open();
+
+        Session session = connection.session();
+        session.open();
+
+        String linkName1 = "myReceiver1";
+        Receiver receiver1 = session.receiver(linkName1);
+        receiver1.flow(5);
+        receiver1.open();
+
+        pumpMockTransport(transport);
+
+        final UnsignedInteger r1handle = UnsignedInteger.ZERO;
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
+
+        // Give the necessary responses to open/begin/attach
+        transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+        Begin begin = new Begin();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+        begin.setNextOutgoingId(UnsignedInteger.ONE);
+        begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        transport.handleFrame(new TransportFrame(0, begin, null));
+
+        Attach attach1 = new Attach();
+        attach1.setHandle(r1handle);
+        attach1.setRole(Role.SENDER);
+        attach1.setName(linkName1);
+        attach1.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, attach1, null));
+
+        // Receive first transfer for a multi-frame delivery
+        handlePartialTransfer(transport, r1handle, 1, "tag1", new byte[]{ 1 }, true);
+
+        // Receive first transfer for ANOTHER multi-frame delivery, expect it to fail
+        // as multiplexing deliveries on a single link is forbidden by the spec.
+        try {
+            handlePartialTransfer(transport, r1handle, 2, "tag2", new byte[]{ 2 }, true);
+            fail("Expected an ISE");
+        } catch(IllegalStateException ise) {
+            // Expected
+            assertEquals("Unexpected message", "Illegal multiplex of deliveries on same link with delivery-id 1 and 2", ise.getMessage());
+        }
+    }
+
+    @Test
+    public void testDeliveryIdTrackingHandlesAbortedDelivery() {
+        MockTransportImpl transport = new MockTransportImpl();
+        transport.setEmitFlowEventOnSend(false);
+        Connection connection = Proton.connection();
+        transport.bind(connection);
+
+        connection.open();
+
+        Session session = connection.session();
+        session.open();
+
+        String linkName1 = "myReceiver1";
+        Receiver receiver1 = session.receiver(linkName1);
+        receiver1.flow(5);
+        receiver1.open();
+
+        pumpMockTransport(transport);
+
+        final UnsignedInteger r1handle = UnsignedInteger.ZERO;
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
+
+        // Give the necessary responses to open/begin/attach
+        transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+        Begin begin = new Begin();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+        begin.setNextOutgoingId(UnsignedInteger.ONE);
+        begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        transport.handleFrame(new TransportFrame(0, begin, null));
+
+        Attach attach1 = new Attach();
+        attach1.setHandle(r1handle);
+        attach1.setRole(Role.SENDER);
+        attach1.setName(linkName1);
+        attach1.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, attach1, null));
+
+        // Receive first transfer for a multi-frame delivery
+        assertEquals("Unexpected queued count", 0, receiver1.getQueued());
+        handlePartialTransfer(transport, r1handle, UnsignedInteger.ZERO, "tag1", new byte[]{ 1 }, true);
+        assertEquals("Unexpected queued count", 1, receiver1.getQueued());
+        // Receive second transfer for a multi-frame delivery, indicating it is aborted
+        handlePartialTransfer(transport, r1handle, UnsignedInteger.ZERO, "tag1", new byte[]{ 2 }, true, true);
+        assertEquals("Unexpected queued count", 1, receiver1.getQueued());
+
+        // Receive first transfer for ANOTHER delivery, expect it not to fail, since the earlier delivery aborted
+        handlePartialTransfer(transport, r1handle, UnsignedInteger.ONE, "tag2", new byte[]{ 3 }, false);
+        assertEquals("Unexpected queued count", 2, receiver1.getQueued());
+
+        receiver1.advance();
+        verifyDeliveryRawPayload(receiver1, "tag2", new byte[] { 3 });
+    }
+
+    @Test
+    public void testDeliveryWithIdOmittedOnContinuationTransfers() {
+        MockTransportImpl transport = new MockTransportImpl();
+        transport.setEmitFlowEventOnSend(false);
+        Connection connection = Proton.connection();
+        transport.bind(connection);
+
+        connection.open();
+
+        Session session = connection.session();
+        session.open();
+
+        String linkName1 = "myReceiver1";
+        Receiver receiver1 = session.receiver(linkName1);
+        receiver1.flow(5);
+        receiver1.open();
+
+        String linkName2 = "myReceiver2";
+        Receiver receiver2 = session.receiver(linkName2);
+        receiver2.flow(5);
+        receiver2.open();
+
+        pumpMockTransport(transport);
+
+        final UnsignedInteger r1handle = UnsignedInteger.ZERO;
+        final UnsignedInteger r2handle = UnsignedInteger.ONE;
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 6, transport.writes.size());
+
+        // Give the necessary responses to open/begin/attach
+        transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+        Begin begin = new Begin();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+        begin.setNextOutgoingId(UnsignedInteger.ONE);
+        begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        transport.handleFrame(new TransportFrame(0, begin, null));
+
+        Attach attach1 = new Attach();
+        attach1.setHandle(r1handle);
+        attach1.setRole(Role.SENDER);
+        attach1.setName(linkName1);
+        attach1.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, attach1, null));
+
+        Attach attach2 = new Attach();
+        attach2.setHandle(r2handle);
+        attach2.setRole(Role.SENDER);
+        attach2.setName(linkName2);
+        attach2.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, attach2, null));
+
+        String deliveryTag1 = "tag1";
+        String deliveryTag2 = "tag2";
+
+        // Send multi-frame deliveries for each link, multiplexed together, and omit
+        // the delivery-id on the continuation frames as allowed for by the spec.
+        handlePartialTransfer(transport, r1handle, 1, deliveryTag1, new byte[]{ 1 }, true);
+        handlePartialTransfer(transport, r2handle, 2, deliveryTag2, new byte[]{ 101 }, true);
+        handlePartialTransfer(transport, r2handle, null, deliveryTag2, new byte[]{ 102 }, true);
+        handlePartialTransfer(transport, r1handle, null, deliveryTag1, new byte[]{ 2 }, true);
+        handlePartialTransfer(transport, r1handle, null, deliveryTag1, new byte[]{ 3 }, false);
+        handlePartialTransfer(transport, r2handle, null, deliveryTag2, new byte[]{ 103 }, true);
+        handlePartialTransfer(transport, r2handle, null, deliveryTag2, new byte[]{ 104 }, false);
+
+        // Verify the transfer frames were all matched to compose the expected delivery payload.
+        verifyDeliveryRawPayload(receiver1, deliveryTag1, new byte[] { 1, 2, 3 });
+        verifyDeliveryRawPayload(receiver2, deliveryTag2, new byte[] { 101, 102, 103, 104 });
+    }
+
+    @Test
+    public void testDeliveryIdThresholdsAndWraps() {
+        // Check start from 0
+        doDeliveryIdThresholdsWrapsTestImpl(UnsignedInteger.ZERO, UnsignedInteger.ONE, UnsignedInteger.valueOf(2));
+        // Check run up to max-int (interesting boundary for underlying impl)
+        doDeliveryIdThresholdsWrapsTestImpl(UnsignedInteger.valueOf(Integer.MAX_VALUE - 2), UnsignedInteger.valueOf(Integer.MAX_VALUE -1), UnsignedInteger.valueOf(Integer.MAX_VALUE));
+        // Check crossing from signed range value into unsigned range value (interesting boundary for underlying impl)
+        long maxIntAsLong = Integer.MAX_VALUE;
+        doDeliveryIdThresholdsWrapsTestImpl(UnsignedInteger.valueOf(maxIntAsLong), UnsignedInteger.valueOf(maxIntAsLong + 1L), UnsignedInteger.valueOf(maxIntAsLong + 2L));
+        // Check run up to max-uint
+        doDeliveryIdThresholdsWrapsTestImpl(UnsignedInteger.valueOf(0xFFFFFFFFL - 2), UnsignedInteger.valueOf(0xFFFFFFFFL - 1), UnsignedInteger.MAX_VALUE);
+        // Check wrapping from max unsigned value back to min(/0).
+        doDeliveryIdThresholdsWrapsTestImpl(UnsignedInteger.MAX_VALUE, UnsignedInteger.ZERO, UnsignedInteger.ONE);
+    }
+
+    private void doDeliveryIdThresholdsWrapsTestImpl(UnsignedInteger deliveryId1, UnsignedInteger deliveryId2, UnsignedInteger deliveryId3) {
+        MockTransportImpl transport = new MockTransportImpl();
+        transport.setEmitFlowEventOnSend(false);
+        Connection connection = Proton.connection();
+        transport.bind(connection);
+
+        connection.open();
+
+        Session session = connection.session();
+        session.open();
+
+        String linkName1 = "myReceiver1";
+        Receiver receiver1 = session.receiver(linkName1);
+        receiver1.flow(5);
+        receiver1.open();
+
+        pumpMockTransport(transport);
+
+        final UnsignedInteger r1handle = UnsignedInteger.ZERO;
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
+
+        // Give the necessary responses to open/begin/attach
+        transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+        Begin begin = new Begin();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+        begin.setNextOutgoingId(UnsignedInteger.ONE);
+        begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        transport.handleFrame(new TransportFrame(0, begin, null));
+
+        Attach attach1 = new Attach();
+        attach1.setHandle(r1handle);
+        attach1.setRole(Role.SENDER);
+        attach1.setName(linkName1);
+        attach1.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, attach1, null));
+
+        String deliveryTag1 = "tag1";
+        String deliveryTag2 = "tag2";
+        String deliveryTag3 = "tag3";
+
+        // Send deliveries with the given delivery-id
+        handlePartialTransfer(transport, r1handle, deliveryId1, deliveryTag1, new byte[]{ 1 }, false);
+        handlePartialTransfer(transport, r1handle, deliveryId2, deliveryTag2, new byte[]{ 2 }, false);
+        handlePartialTransfer(transport, r1handle, deliveryId3, deliveryTag3, new byte[]{ 3 }, false);
+
+        // Verify deliveries arrived with expected payload
+        verifyDeliveryRawPayload(receiver1, deliveryTag1, new byte[] { 1 });
+        verifyDeliveryRawPayload(receiver1, deliveryTag2, new byte[] { 2 });
+        verifyDeliveryRawPayload(receiver1, deliveryTag3, new byte[] { 3 });
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/2] qpid-proton-j git commit: PROTON-1902: fix/allow handling of aborted deliveries

Posted by ro...@apache.org.
PROTON-1902: fix/allow handling of aborted deliveries

Ensure 'aborted' flag overrules the 'more' and 'settled' flags as appropriate, account for delivery properly, and allow determining it was aborted.

(cherry picked from commit b5cd0a4601d58b3fc5440cf3f586922fcf3993ea)


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/commit/1f1408a3
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/tree/1f1408a3
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/diff/1f1408a3

Branch: refs/heads/0.27.x
Commit: 1f1408a30105faf342126c5399451be30335d22f
Parents: 40edf35
Author: Robbie Gemmell <ro...@apache.org>
Authored: Mon Jul 30 13:43:59 2018 +0100
Committer: Robbie Gemmell <ro...@apache.org>
Committed: Mon Jul 30 16:05:24 2018 +0100

----------------------------------------------------------------------
 .../org/apache/qpid/proton/engine/Delivery.java |  22 +++
 .../qpid/proton/engine/impl/DeliveryImpl.java   |  12 ++
 .../proton/engine/impl/TransportSession.java    |  16 +-
 .../proton/engine/impl/TransportImplTest.java   | 147 +++++++++++++++++++
 4 files changed, 190 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/1f1408a3/proton-j/src/main/java/org/apache/qpid/proton/engine/Delivery.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Delivery.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Delivery.java
index 58c62b6..b997309 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Delivery.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Delivery.java
@@ -99,8 +99,30 @@ public interface Delivery extends Extendable
 
     public void clear();
 
+    /**
+     * Check for whether the delivery is still partial.
+     *
+     * For a receiving Delivery, this means the delivery does not hold
+     * a complete message payload as all the content hasn't been
+     * received yet. Note that an {@link #isAborted() aborted} delivery
+     * will also be considered partial and the full payload won't
+     * be received.
+     *
+     * For a sending Delivery, this means the sender link has not been
+     * {@link Sender#advance() advanced} to complete the delivery yet.
+     *
+     * @return true if the delivery is partial
+     * @see #isAborted()
+     */
     public boolean isPartial();
 
+    /**
+     * Check for whether the delivery was aborted.
+     *
+     * @return true if the delivery was aborted.
+     */
+    boolean isAborted();
+
     public int pending();
 
     public boolean isBuffered();

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/1f1408a3/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
index 14950a7..251c68a 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
@@ -70,6 +70,7 @@ public class DeliveryImpl implements Delivery
     private boolean _complete;
     private boolean _updated;
     private boolean _done;
+    private boolean _aborted;
 
     private CompositeReadableBuffer _dataBuffer;
     private ReadableBuffer _dataView;
@@ -479,6 +480,17 @@ public class DeliveryImpl implements Delivery
         _complete = true;
     }
 
+    void setAborted()
+    {
+        _aborted = true;
+    }
+
+    @Override
+    public boolean isAborted()
+    {
+        return _aborted;
+    }
+
     @Override
     public boolean isPartial()
     {

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/1f1408a3/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
index 567e8eb..e473675 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
@@ -302,7 +302,8 @@ class TransportSession
         }
         _unsettledIncomingSize++;
 
-        if (payload != null)
+        boolean aborted = transfer.getAborted();
+        if (payload != null && !aborted)
         {
             delivery.append(payload);
             getSession().incrementIncomingBytes(payload.getLength());
@@ -310,19 +311,20 @@ class TransportSession
 
         delivery.updateWork();
 
-        if(!transfer.getMore() || transfer.getAborted())
+        if(!transfer.getMore() || aborted)
         {
             transportReceiver.setIncomingDeliveryId(null);
-        }
+            if(aborted) {
+                delivery.setAborted();
+            } else {
+                delivery.setComplete();
+            }
 
-        if(!(transfer.getMore() || transfer.getAborted()))
-        {
-            delivery.setComplete();
             delivery.getLink().getTransportLink().decrementLinkCredit();
             delivery.getLink().getTransportLink().incrementDeliveryCount();
         }
 
-        if(Boolean.TRUE.equals(transfer.getSettled()))
+        if(Boolean.TRUE.equals(transfer.getSettled()) || aborted)
         {
             delivery.setRemoteSettled(true);
         }

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/1f1408a3/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
----------------------------------------------------------------------
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
index d66964b..fbc9b8f 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
@@ -2975,6 +2975,11 @@ public class TransportImplTest
 
     private void handlePartialTransfer(TransportImpl transport, UnsignedInteger handle, UnsignedInteger deliveryId, String deliveryTag, byte[] partialPayload, boolean more, boolean aborted)
     {
+        handlePartialTransfer(transport, handle, deliveryId, deliveryTag, partialPayload, more, aborted, null);
+    }
+
+    private void handlePartialTransfer(TransportImpl transport, UnsignedInteger handle, UnsignedInteger deliveryId, String deliveryTag, byte[] partialPayload, boolean more, boolean aborted, Boolean settled)
+    {
         byte[] tag = deliveryTag.getBytes(StandardCharsets.UTF_8);
 
         Transfer transfer = new Transfer();
@@ -2987,6 +2992,9 @@ public class TransportImplTest
             // Can be omitted in continuation frames for a given delivery.
             transfer.setDeliveryId(deliveryId);
         }
+        if(settled != null) {
+            transfer.setSettled(settled);
+        }
 
         transport.handleFrame(new TransportFrame(0, transfer, new Binary(partialPayload, 0, partialPayload.length)));
     }
@@ -3383,4 +3391,143 @@ public class TransportImplTest
         verifyDeliveryRawPayload(receiver1, deliveryTag2, new byte[] { 2 });
         verifyDeliveryRawPayload(receiver1, deliveryTag3, new byte[] { 3 });
     }
+
+    @Test
+    public void testAbortedDelivery() {
+        // Check aborted=true, more=false, settled=true.
+        doAbortedDeliveryTestImpl(false, true);
+        // Check aborted=true, more=false, settled=unset(false)
+        // Aborted overrides settled not being set.
+        doAbortedDeliveryTestImpl(false, null);
+        // Check aborted=true, more=false, settled=false
+        // Aborted overrides settled being explicitly false.
+        doAbortedDeliveryTestImpl(false, false);
+
+        // Check aborted=true, more=true, settled=true
+        // Aborted overrides the more=true.
+        doAbortedDeliveryTestImpl(true, true);
+        // Check aborted=true, more=true, settled=unset(false)
+        // Aborted overrides the more=true, and settled being unset.
+        doAbortedDeliveryTestImpl(true, null);
+        // Check aborted=true, more=true, settled=false
+        // Aborted overrides the more=true, and settled explicitly false.
+        doAbortedDeliveryTestImpl(true, false);
+    }
+
+    private void doAbortedDeliveryTestImpl(boolean setMoreOnAbortedTransfer, Boolean setSettledOnAbortedTransfer) {
+        MockTransportImpl transport = new MockTransportImpl();
+        transport.setEmitFlowEventOnSend(false);
+        Connection connection = Proton.connection();
+        transport.bind(connection);
+
+        connection.open();
+
+        Session session = connection.session();
+        session.open();
+
+        String linkName1 = "myReceiver1";
+        Receiver receiver1 = session.receiver(linkName1);
+        receiver1.flow(3);
+        receiver1.open();
+
+        pumpMockTransport(transport);
+
+        final UnsignedInteger r1handle = UnsignedInteger.ZERO;
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
+
+        // Give the necessary responses to open/begin/attach
+        transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+        Begin begin = new Begin();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+        begin.setNextOutgoingId(UnsignedInteger.ONE);
+        begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        transport.handleFrame(new TransportFrame(0, begin, null));
+
+        Attach attach1 = new Attach();
+        attach1.setHandle(r1handle);
+        attach1.setRole(Role.SENDER);
+        attach1.setName(linkName1);
+        attach1.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, attach1, null));
+
+        String deliveryTag1 = "tag1";
+        String deliveryTag2 = "tag2";
+        String deliveryTag3 = "tag3";
+
+        // Receive first delivery
+        handlePartialTransfer(transport, r1handle, UnsignedInteger.ZERO, deliveryTag1, new byte[]{ 1 }, true);
+        assertEquals("Unexpected incoming bytes count", 1, session.getIncomingBytes());
+        handlePartialTransfer(transport, r1handle, UnsignedInteger.ZERO, deliveryTag1, new byte[]{ 2 }, false);
+
+        assertEquals("Unexpected queued count", 1, receiver1.getQueued());
+        assertEquals("Unexpected incoming bytes count", 2, session.getIncomingBytes());
+        assertEquals("Unexpected credit", 3, receiver1.getCredit());
+
+        // Receive first transfer for a multi-frame delivery
+        handlePartialTransfer(transport, r1handle, UnsignedInteger.ONE, deliveryTag2, new byte[]{ 3 }, true);
+        assertEquals("Unexpected queued count", 2, receiver1.getQueued());
+        assertEquals("Unexpected credit", 3, receiver1.getCredit());
+        assertEquals("Unexpected incoming bytes count", 3, session.getIncomingBytes());
+        // Receive second transfer for a multi-frame delivery, indicating it is aborted
+        handlePartialTransfer(transport, r1handle, UnsignedInteger.ONE, deliveryTag2, new byte[]{ 4 }, setMoreOnAbortedTransfer, true, setSettledOnAbortedTransfer);
+        assertEquals("Unexpected queued count", 2, receiver1.getQueued());
+        assertEquals("Unexpected credit", 3, receiver1.getCredit());
+        // The aborted frame payload, if any, is dropped. Earlier payload could have already been read, was
+        // previously accounted for, and is incomplete, leaving alone for regular cleanup accounting handling.
+        assertEquals("Unexpected incoming bytes count", 3, session.getIncomingBytes());
+
+        // Receive transfers for ANOTHER delivery, expect it not to fail, since the earlier delivery aborted
+        handlePartialTransfer(transport, r1handle, UnsignedInteger.valueOf(2), deliveryTag3, new byte[]{ 5 }, true);
+        handlePartialTransfer(transport, r1handle, UnsignedInteger.valueOf(2), deliveryTag3, new byte[]{ 6 }, false);
+        assertEquals("Unexpected queued count", 3, receiver1.getQueued());
+        assertEquals("Unexpected credit", 3, receiver1.getCredit());
+        assertEquals("Unexpected incoming bytes count", 5, session.getIncomingBytes());
+
+        // Check the first delivery
+        verifyDeliveryRawPayload(receiver1, deliveryTag1, new byte[] { 1, 2 });
+        assertEquals("Unexpected queued count", 2, receiver1.getQueued());
+        assertEquals("Unexpected credit", 2, receiver1.getCredit());
+        assertEquals("Unexpected incoming bytes count", 3, session.getIncomingBytes());
+
+        // Check the aborted delivery
+        Delivery delivery = receiver1.current();
+        assertTrue(Arrays.equals(deliveryTag2.getBytes(StandardCharsets.UTF_8), delivery.getTag()));
+
+        assertTrue(delivery.isAborted());
+        assertTrue(delivery.remotelySettled()); // Since aborted implicitly means it is settled.
+        assertTrue(delivery.isPartial());
+        assertTrue(delivery.isReadable());
+
+        byte[] received = new byte[delivery.pending()];
+        int len = receiver1.recv(received, 0, BUFFER_SIZE);
+        assertEquals("unexpected length", len, received.length);
+
+        assertArrayEquals("Received delivery payload not as expected", new byte[] { 3 }, received);
+
+        assertTrue("receiver did not advance", receiver1.advance());
+
+        assertEquals("Unexpected queued count", 1, receiver1.getQueued());
+        assertEquals("Unexpected credit", 1, receiver1.getCredit());
+        assertEquals("Unexpected incoming bytes count", 2, session.getIncomingBytes());
+
+        // Check the third delivery
+        verifyDeliveryRawPayload(receiver1, deliveryTag3, new byte[] { 5, 6 });
+        assertEquals("Unexpected queued count", 0, receiver1.getQueued());
+        assertEquals("Unexpected credit", 0, receiver1.getCredit());
+        assertEquals("Unexpected incoming bytes count", 0, session.getIncomingBytes());
+
+        // Flow new credit and check delivery-count + credit on wire are as expected.
+        receiver1.flow(123);
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 5, transport.writes.size());
+        assertTrue("Unexpected frame type", transport.writes.get(4) instanceof Flow);
+        Flow sentFlow = (Flow) transport.writes.get(4);
+
+        assertEquals("Unexpected delivery count", UnsignedInteger.valueOf(3), sentFlow.getDeliveryCount());
+        assertEquals("Unexpected credit", UnsignedInteger.valueOf(123), sentFlow.getLinkCredit());
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org