You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2018/07/31 10:05:00 UTC

[2/2] qpid-proton-j git commit: PROTON-1902: fix/allow handling of aborted deliveries

PROTON-1902: fix/allow handling of aborted deliveries

Ensure 'aborted' flag overrules the 'more' and 'settled' flags as appropriate, account for delivery properly, and allow determining it was aborted.

(cherry picked from commit b5cd0a4601d58b3fc5440cf3f586922fcf3993ea)


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/commit/1f1408a3
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/tree/1f1408a3
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/diff/1f1408a3

Branch: refs/heads/0.27.x
Commit: 1f1408a30105faf342126c5399451be30335d22f
Parents: 40edf35
Author: Robbie Gemmell <ro...@apache.org>
Authored: Mon Jul 30 13:43:59 2018 +0100
Committer: Robbie Gemmell <ro...@apache.org>
Committed: Mon Jul 30 16:05:24 2018 +0100

----------------------------------------------------------------------
 .../org/apache/qpid/proton/engine/Delivery.java |  22 +++
 .../qpid/proton/engine/impl/DeliveryImpl.java   |  12 ++
 .../proton/engine/impl/TransportSession.java    |  16 +-
 .../proton/engine/impl/TransportImplTest.java   | 147 +++++++++++++++++++
 4 files changed, 190 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/1f1408a3/proton-j/src/main/java/org/apache/qpid/proton/engine/Delivery.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Delivery.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Delivery.java
index 58c62b6..b997309 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Delivery.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Delivery.java
@@ -99,8 +99,30 @@ public interface Delivery extends Extendable
 
     public void clear();
 
+    /**
+     * Check for whether the delivery is still partial.
+     *
+     * For a receiving Delivery, this means the delivery does not hold
+     * a complete message payload as all the content hasn't been
+     * received yet. Note that an {@link #isAborted() aborted} delivery
+     * will also be considered partial and the full payload won't
+     * be received.
+     *
+     * For a sending Delivery, this means the sender link has not been
+     * {@link Sender#advance() advanced} to complete the delivery yet.
+     *
+     * @return true if the delivery is partial
+     * @see #isAborted()
+     */
     public boolean isPartial();
 
+    /**
+     * Check for whether the delivery was aborted.
+     *
+     * @return true if the delivery was aborted.
+     */
+    boolean isAborted();
+
     public int pending();
 
     public boolean isBuffered();

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/1f1408a3/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
index 14950a7..251c68a 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
@@ -70,6 +70,7 @@ public class DeliveryImpl implements Delivery
     private boolean _complete;
     private boolean _updated;
     private boolean _done;
+    private boolean _aborted;
 
     private CompositeReadableBuffer _dataBuffer;
     private ReadableBuffer _dataView;
@@ -479,6 +480,17 @@ public class DeliveryImpl implements Delivery
         _complete = true;
     }
 
+    void setAborted()
+    {
+        _aborted = true;
+    }
+
+    @Override
+    public boolean isAborted()
+    {
+        return _aborted;
+    }
+
     @Override
     public boolean isPartial()
     {

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/1f1408a3/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
index 567e8eb..e473675 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
@@ -302,7 +302,8 @@ class TransportSession
         }
         _unsettledIncomingSize++;
 
-        if (payload != null)
+        boolean aborted = transfer.getAborted();
+        if (payload != null && !aborted)
         {
             delivery.append(payload);
             getSession().incrementIncomingBytes(payload.getLength());
@@ -310,19 +311,20 @@ class TransportSession
 
         delivery.updateWork();
 
-        if(!transfer.getMore() || transfer.getAborted())
+        if(!transfer.getMore() || aborted)
         {
             transportReceiver.setIncomingDeliveryId(null);
-        }
+            if(aborted) {
+                delivery.setAborted();
+            } else {
+                delivery.setComplete();
+            }
 
-        if(!(transfer.getMore() || transfer.getAborted()))
-        {
-            delivery.setComplete();
             delivery.getLink().getTransportLink().decrementLinkCredit();
             delivery.getLink().getTransportLink().incrementDeliveryCount();
         }
 
-        if(Boolean.TRUE.equals(transfer.getSettled()))
+        if(Boolean.TRUE.equals(transfer.getSettled()) || aborted)
         {
             delivery.setRemoteSettled(true);
         }

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/1f1408a3/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
----------------------------------------------------------------------
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
index d66964b..fbc9b8f 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
@@ -2975,6 +2975,11 @@ public class TransportImplTest
 
     private void handlePartialTransfer(TransportImpl transport, UnsignedInteger handle, UnsignedInteger deliveryId, String deliveryTag, byte[] partialPayload, boolean more, boolean aborted)
     {
+        handlePartialTransfer(transport, handle, deliveryId, deliveryTag, partialPayload, more, aborted, null);
+    }
+
+    private void handlePartialTransfer(TransportImpl transport, UnsignedInteger handle, UnsignedInteger deliveryId, String deliveryTag, byte[] partialPayload, boolean more, boolean aborted, Boolean settled)
+    {
         byte[] tag = deliveryTag.getBytes(StandardCharsets.UTF_8);
 
         Transfer transfer = new Transfer();
@@ -2987,6 +2992,9 @@ public class TransportImplTest
             // Can be omitted in continuation frames for a given delivery.
             transfer.setDeliveryId(deliveryId);
         }
+        if(settled != null) {
+            transfer.setSettled(settled);
+        }
 
         transport.handleFrame(new TransportFrame(0, transfer, new Binary(partialPayload, 0, partialPayload.length)));
     }
@@ -3383,4 +3391,143 @@ public class TransportImplTest
         verifyDeliveryRawPayload(receiver1, deliveryTag2, new byte[] { 2 });
         verifyDeliveryRawPayload(receiver1, deliveryTag3, new byte[] { 3 });
     }
+
+    @Test
+    public void testAbortedDelivery() {
+        // Check aborted=true, more=false, settled=true.
+        doAbortedDeliveryTestImpl(false, true);
+        // Check aborted=true, more=false, settled=unset(false)
+        // Aborted overrides settled not being set.
+        doAbortedDeliveryTestImpl(false, null);
+        // Check aborted=true, more=false, settled=false
+        // Aborted overrides settled being explicitly false.
+        doAbortedDeliveryTestImpl(false, false);
+
+        // Check aborted=true, more=true, settled=true
+        // Aborted overrides the more=true.
+        doAbortedDeliveryTestImpl(true, true);
+        // Check aborted=true, more=true, settled=unset(false)
+        // Aborted overrides the more=true, and settled being unset.
+        doAbortedDeliveryTestImpl(true, null);
+        // Check aborted=true, more=true, settled=false
+        // Aborted overrides the more=true, and settled explicitly false.
+        doAbortedDeliveryTestImpl(true, false);
+    }
+
+    private void doAbortedDeliveryTestImpl(boolean setMoreOnAbortedTransfer, Boolean setSettledOnAbortedTransfer) {
+        MockTransportImpl transport = new MockTransportImpl();
+        transport.setEmitFlowEventOnSend(false);
+        Connection connection = Proton.connection();
+        transport.bind(connection);
+
+        connection.open();
+
+        Session session = connection.session();
+        session.open();
+
+        String linkName1 = "myReceiver1";
+        Receiver receiver1 = session.receiver(linkName1);
+        receiver1.flow(3);
+        receiver1.open();
+
+        pumpMockTransport(transport);
+
+        final UnsignedInteger r1handle = UnsignedInteger.ZERO;
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
+
+        // Give the necessary responses to open/begin/attach
+        transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+        Begin begin = new Begin();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+        begin.setNextOutgoingId(UnsignedInteger.ONE);
+        begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        transport.handleFrame(new TransportFrame(0, begin, null));
+
+        Attach attach1 = new Attach();
+        attach1.setHandle(r1handle);
+        attach1.setRole(Role.SENDER);
+        attach1.setName(linkName1);
+        attach1.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, attach1, null));
+
+        String deliveryTag1 = "tag1";
+        String deliveryTag2 = "tag2";
+        String deliveryTag3 = "tag3";
+
+        // Receive first delivery
+        handlePartialTransfer(transport, r1handle, UnsignedInteger.ZERO, deliveryTag1, new byte[]{ 1 }, true);
+        assertEquals("Unexpected incoming bytes count", 1, session.getIncomingBytes());
+        handlePartialTransfer(transport, r1handle, UnsignedInteger.ZERO, deliveryTag1, new byte[]{ 2 }, false);
+
+        assertEquals("Unexpected queued count", 1, receiver1.getQueued());
+        assertEquals("Unexpected incoming bytes count", 2, session.getIncomingBytes());
+        assertEquals("Unexpected credit", 3, receiver1.getCredit());
+
+        // Receive first transfer for a multi-frame delivery
+        handlePartialTransfer(transport, r1handle, UnsignedInteger.ONE, deliveryTag2, new byte[]{ 3 }, true);
+        assertEquals("Unexpected queued count", 2, receiver1.getQueued());
+        assertEquals("Unexpected credit", 3, receiver1.getCredit());
+        assertEquals("Unexpected incoming bytes count", 3, session.getIncomingBytes());
+        // Receive second transfer for a multi-frame delivery, indicating it is aborted
+        handlePartialTransfer(transport, r1handle, UnsignedInteger.ONE, deliveryTag2, new byte[]{ 4 }, setMoreOnAbortedTransfer, true, setSettledOnAbortedTransfer);
+        assertEquals("Unexpected queued count", 2, receiver1.getQueued());
+        assertEquals("Unexpected credit", 3, receiver1.getCredit());
+        // The aborted frame payload, if any, is dropped. Earlier payload could have already been read, was
+        // previously accounted for, and is incomplete, leaving alone for regular cleanup accounting handling.
+        assertEquals("Unexpected incoming bytes count", 3, session.getIncomingBytes());
+
+        // Receive transfers for ANOTHER delivery, expect it not to fail, since the earlier delivery aborted
+        handlePartialTransfer(transport, r1handle, UnsignedInteger.valueOf(2), deliveryTag3, new byte[]{ 5 }, true);
+        handlePartialTransfer(transport, r1handle, UnsignedInteger.valueOf(2), deliveryTag3, new byte[]{ 6 }, false);
+        assertEquals("Unexpected queued count", 3, receiver1.getQueued());
+        assertEquals("Unexpected credit", 3, receiver1.getCredit());
+        assertEquals("Unexpected incoming bytes count", 5, session.getIncomingBytes());
+
+        // Check the first delivery
+        verifyDeliveryRawPayload(receiver1, deliveryTag1, new byte[] { 1, 2 });
+        assertEquals("Unexpected queued count", 2, receiver1.getQueued());
+        assertEquals("Unexpected credit", 2, receiver1.getCredit());
+        assertEquals("Unexpected incoming bytes count", 3, session.getIncomingBytes());
+
+        // Check the aborted delivery
+        Delivery delivery = receiver1.current();
+        assertTrue(Arrays.equals(deliveryTag2.getBytes(StandardCharsets.UTF_8), delivery.getTag()));
+
+        assertTrue(delivery.isAborted());
+        assertTrue(delivery.remotelySettled()); // Since aborted implicitly means it is settled.
+        assertTrue(delivery.isPartial());
+        assertTrue(delivery.isReadable());
+
+        byte[] received = new byte[delivery.pending()];
+        int len = receiver1.recv(received, 0, BUFFER_SIZE);
+        assertEquals("unexpected length", len, received.length);
+
+        assertArrayEquals("Received delivery payload not as expected", new byte[] { 3 }, received);
+
+        assertTrue("receiver did not advance", receiver1.advance());
+
+        assertEquals("Unexpected queued count", 1, receiver1.getQueued());
+        assertEquals("Unexpected credit", 1, receiver1.getCredit());
+        assertEquals("Unexpected incoming bytes count", 2, session.getIncomingBytes());
+
+        // Check the third delivery
+        verifyDeliveryRawPayload(receiver1, deliveryTag3, new byte[] { 5, 6 });
+        assertEquals("Unexpected queued count", 0, receiver1.getQueued());
+        assertEquals("Unexpected credit", 0, receiver1.getCredit());
+        assertEquals("Unexpected incoming bytes count", 0, session.getIncomingBytes());
+
+        // Flow new credit and check delivery-count + credit on wire are as expected.
+        receiver1.flow(123);
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 5, transport.writes.size());
+        assertTrue("Unexpected frame type", transport.writes.get(4) instanceof Flow);
+        Flow sentFlow = (Flow) transport.writes.get(4);
+
+        assertEquals("Unexpected delivery count", UnsignedInteger.valueOf(3), sentFlow.getDeliveryCount());
+        assertEquals("Unexpected credit", UnsignedInteger.valueOf(123), sentFlow.getLinkCredit());
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org