You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2019/05/01 15:24:00 UTC

[qpid-proton-j] branch master updated: PROTON-2029 Add fix and test for extraneous disposition after settle

This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton-j.git


The following commit(s) were added to refs/heads/master by this push:
     new ad0a7de  PROTON-2029 Add fix and test for extraneous disposition after settle
ad0a7de is described below

commit ad0a7debf47a4e7cdbcb97c3cf36d185c8863c18
Author: Timothy Bish <ta...@gmail.com>
AuthorDate: Mon Apr 29 15:01:19 2019 -0400

    PROTON-2029 Add fix and test for extraneous disposition after settle
    
    Adds tests for disposition change after settled and a check for settled
    state to circumvent more transport work being added when the delivery
    has already been tagged as settled.
---
 .../qpid/proton/engine/impl/DeliveryImpl.java      |   4 +
 .../qpid/proton/engine/impl/TransportImplTest.java | 159 +++++++++++++++++++++
 2 files changed, 163 insertions(+)

diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
index 251c68a..7312bef 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
@@ -132,6 +132,10 @@ public class DeliveryImpl implements Delivery
     @Override
     public void disposition(final DeliveryState state)
     {
+        if (_settled) {
+            return;
+        }
+
         _deliveryState = state;
         if(!_remoteSettled)
         {
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
index 8b81355..4584cdb 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
@@ -3874,4 +3874,163 @@ public class TransportImplTest
         assertEquals(TransportImpl.OUTGOING, eventRef.get());
         assertEquals(sasl ? "SASL" : "AMQP", headerRef.get());
     }
+
+    /**
+     * Verify that no Disposition frame is emitted by the Transport should a Delivery
+     * have disposition applied after the delivery has been settled previously.
+     */
+    @Test
+    public void testNoDispositionUpdatesAfterSettlementProceessedSender()
+    {
+        MockTransportImpl transport = new MockTransportImpl();
+        Connection connection = Proton.connection();
+        transport.bind(connection);
+
+        connection.open();
+
+        Session session = connection.session();
+        session.open();
+
+        String linkName = "myReceiver";
+        Receiver receiver = session.receiver(linkName);
+        receiver.flow(5);
+        receiver.open();
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
+
+        assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
+        assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
+        assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach);
+        assertTrue("Unexpected frame type", transport.writes.get(3) instanceof Flow);
+
+        Delivery delivery = receiver.current();
+        assertNull("Should not yet have a delivery", delivery);
+
+        // Send the necessary responses to open/begin/attach as well as a transfer
+        transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+        Begin begin = new Begin();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+        begin.setNextOutgoingId(UnsignedInteger.ONE);
+        begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        transport.handleFrame(new TransportFrame(0, begin, null));
+
+        Attach attach = new Attach();
+        attach.setHandle(UnsignedInteger.ZERO);
+        attach.setRole(Role.SENDER);
+        attach.setName(linkName);
+        attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, attach, null));
+
+        String deliveryTag = "tag1";
+        String messageContent = "content1";
+        handleTransfer(transport, 1, deliveryTag, messageContent);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
+
+        delivery = verifyDelivery(receiver, deliveryTag, messageContent);
+        assertNotNull("Should now have a delivery", delivery);
+
+        delivery.disposition(Accepted.getInstance());
+        delivery.settle();
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 5, transport.writes.size());
+        assertTrue("Unexpected frame type", transport.writes.get(4) instanceof Disposition);
+
+        // Should not produce any new frames being written
+        delivery.disposition(Accepted.getInstance());
+
+        connection.close();
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 6, transport.writes.size());
+        assertTrue("Unexpected frame type", transport.writes.get(5) instanceof Close);
+    }
+
+    /**
+     * Verify that no Disposition frame is emitted by the Transport should a Delivery
+     * have disposition applied after the delivery has been settled previously.
+     */
+    @Test
+    public void testNoDispositionUpdatesAfterSettlementProceessedReceiver()
+    {
+        MockTransportImpl transport = new MockTransportImpl();
+        Connection connection = Proton.connection();
+        transport.bind(connection);
+
+        connection.open();
+
+        Session session = connection.session();
+        session.open();
+
+        String linkName = "myReceiver";
+        Sender sender = session.sender(linkName);
+        sender.open();
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
+
+        assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
+        assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
+        assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach);
+
+        // Send the necessary responses to open/begin/attach as well as a transfer
+        transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+        Begin begin = new Begin();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+        begin.setNextOutgoingId(UnsignedInteger.ONE);
+        begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        transport.handleFrame(new TransportFrame(0, begin, null));
+
+        Attach attach = new Attach();
+        attach.setHandle(UnsignedInteger.ZERO);
+        attach.setRole(Role.RECEIVER);
+        attach.setName(linkName);
+        attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, attach, null));
+
+        int credit = 1;
+        Flow flow = new Flow();
+        flow.setHandle(UnsignedInteger.ZERO);
+        flow.setDeliveryCount(UnsignedInteger.ZERO);
+        flow.setNextIncomingId(UnsignedInteger.ONE);
+        flow.setNextOutgoingId(UnsignedInteger.ZERO);
+        flow.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        flow.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        flow.setDrain(true);
+        flow.setLinkCredit(UnsignedInteger.valueOf(credit));
+        transport.handleFrame(new TransportFrame(0, flow, null));
+
+        Delivery delivery = sendMessage(sender, "tag1", "content1");
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
+        assertTrue("Unexpected frame type", transport.writes.get(3) instanceof Transfer);
+
+        delivery.disposition(Accepted.getInstance());
+        delivery.settle();
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 5, transport.writes.size());
+        assertTrue("Unexpected frame type", transport.writes.get(4) instanceof Disposition);
+
+        // Should not produce any new frames being written
+        delivery.disposition(Accepted.getInstance());
+
+        connection.close();
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 6, transport.writes.size());
+        assertTrue("Unexpected frame type", transport.writes.get(5) instanceof Close);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org