You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2018/01/26 17:14:04 UTC

[2/2] qpid-broker-j git commit: QPID-8085: [Broker-J][AMQP 1.0] Optimize the sending of flow performatives from broker sending link endpoint

QPID-8085: [Broker-J][AMQP 1.0] Optimize the sending of flow performatives from broker sending link endpoint


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/c49dc0f4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/c49dc0f4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/c49dc0f4

Branch: refs/heads/master
Commit: c49dc0f473c6e727caa1143cb155774607bc23a2
Parents: 557fe9c
Author: Alex Rudyy <or...@apache.org>
Authored: Fri Jan 26 16:55:54 2018 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Fri Jan 26 17:10:31 2018 +0000

----------------------------------------------------------------------
 .../protocol/v1_0/ConsumerTarget_1_0.java       |   1 -
 .../protocol/v1_0/messaging/TransferTest.java   |   3 +-
 .../protocol/v1_0/transport/link/FlowTest.java  | 238 ++++++++++++++++++-
 3 files changed, 239 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c49dc0f4/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index 4798815..ebcaa60 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -470,7 +470,6 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
                             {
                                 _linkEndpoint.updateDisposition(_deliveryTag, outcome, true);
                             }
-                            _linkEndpoint.sendFlowConditional();
                         }
 
                         @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c49dc0f4/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index 68d7179..28ab7ea 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -1020,7 +1020,8 @@ public class TransferTest extends BrokerAdminUsingTestBase
                        .disposition();
 
             // make sure sure the disposition is handled by making drain request
-            interaction.flowLinkCredit(UnsignedInteger.ZERO)
+            interaction.flowLinkCredit(UnsignedInteger.ONE)
+                       .flowNextIncomingId(UnsignedInteger.valueOf(numberOfMessages))
                        .flowDrain(Boolean.TRUE)
                        .flow()
                        .consumeResponse(Flow.class);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c49dc0f4/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
index 1a38cf0..75f4d14 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
@@ -41,8 +41,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.protocol.v1_0.type.transport.SessionError;
-import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.protocol.v1_0.Utils;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
@@ -195,4 +196,239 @@ public class FlowTest extends BrokerAdminUsingTestBase
             assertThat(responseEnd.getError().getCondition(), is(equalTo(SessionError.UNATTACHED_HANDLE)));
         }
     }
+
+    @Test
+    @SpecificationTest(section = "2.6.8",
+            description = "Synchronous get with a timeout is accomplished by incrementing the link-credit,"
+                          + " sending the updated flow state and waiting for the link-credit to be consumed."
+                          + " When the desired time has elapsed the receiver then sets the drain flag and sends"
+                          + " the newly updated flow state again, while continuing to wait for the link-credit"
+                          + " to be consumed.")
+    public void synchronousGetWithTimeoutEmptyQueue() throws Exception
+    {
+        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+        final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        try (FrameTransport transport = new FrameTransport(addr).connect())
+        {
+            Interaction interaction = transport.newInteraction()
+                                               .negotiateProtocol().consumeResponse()
+                                               .open().consumeResponse(Open.class)
+                                               .begin().consumeResponse(Begin.class)
+                                               .attachRole(Role.RECEIVER)
+                                               .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                               .attach().consumeResponse(Attach.class);
+
+            Attach remoteAttach = interaction.getLatestResponse(Attach.class);
+            UnsignedInteger remoteHandle = remoteAttach.getHandle();
+            assertThat(remoteHandle, is(notNullValue()));
+
+            Flow responseFlow = interaction.flowIncomingWindow(UnsignedInteger.valueOf(1))
+                                           .flowNextIncomingId(UnsignedInteger.ZERO)
+                                           .flowLinkCredit(UnsignedInteger.ONE)
+                                           .flowDrain(Boolean.FALSE)
+                                           .flowEcho(Boolean.TRUE)
+                                           .flowHandleFromLinkHandle()
+                                           .flow()
+                                           .consumeResponse().getLatestResponse(Flow.class);
+
+            assertThat(responseFlow.getHandle(), is(equalTo(remoteHandle)));
+            assertThat(responseFlow.getLinkCredit(), is(equalTo(UnsignedInteger.ONE)));
+            assertThat(responseFlow.getDrain(), is(equalTo(Boolean.FALSE)));
+
+            responseFlow = interaction.flowLinkCredit(UnsignedInteger.ONE)
+                                      .flowDrain(Boolean.TRUE)
+                                      .flowEcho(Boolean.FALSE)
+                                      .flowHandleFromLinkHandle()
+                                      .flow()
+                                      .consumeResponse().getLatestResponse(Flow.class);
+
+            assertThat(responseFlow.getHandle(), is(equalTo(remoteHandle)));
+            assertThat(responseFlow.getLinkCredit(), is(equalTo(UnsignedInteger.ZERO)));
+        }
+    }
+
+
+    @Test
+    @SpecificationTest(section = "2.6.8",
+            description = "Synchronous get with a timeout is accomplished by incrementing the link-credit,"
+                          + " sending the updated flow state and waiting for the link-credit to be consumed."
+                          + " When the desired time has elapsed the receiver then sets the drain flag and sends"
+                          + " the newly updated flow state again, while continuing to wait for the link-credit"
+                          + " to be consumed.")
+    public void synchronousGetWithTimeoutNonEmptyQueue() throws Exception
+    {
+        BrokerAdmin brokerAdmin = getBrokerAdmin();
+        brokerAdmin.createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+        String messageContent = "Test";
+        brokerAdmin.putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, messageContent);
+
+        final InetSocketAddress addr = brokerAdmin.getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        try (FrameTransport transport = new FrameTransport(addr).connect())
+        {
+            Interaction interaction = transport.newInteraction()
+                                               .negotiateProtocol().consumeResponse()
+                                               .open().consumeResponse(Open.class)
+                                               .begin().consumeResponse(Begin.class)
+                                               .attachRole(Role.RECEIVER)
+                                               .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                               .attach().consumeResponse(Attach.class);
+
+            Attach remoteAttach = interaction.getLatestResponse(Attach.class);
+            UnsignedInteger remoteHandle = remoteAttach.getHandle();
+            assertThat(remoteHandle, is(notNullValue()));
+
+            Object receivedMessageContent = interaction.flowIncomingWindow(UnsignedInteger.valueOf(1))
+                                                       .flowNextIncomingId(UnsignedInteger.ZERO)
+                                                       .flowLinkCredit(UnsignedInteger.ONE)
+                                                       .flowDrain(Boolean.FALSE)
+                                                       .flowEcho(Boolean.FALSE)
+                                                       .flowHandleFromLinkHandle()
+                                                       .flow()
+                                                       .receiveDelivery()
+                                                       .decodeLatestDelivery()
+                                                       .getDecodedLatestDelivery();
+
+            assertThat(receivedMessageContent, is(equalTo(messageContent)));
+            assertThat(interaction.getLatestDeliveryId(), is(equalTo(UnsignedInteger.ZERO)));
+
+            Flow responseFlow = interaction.flowNextIncomingId(UnsignedInteger.ONE)
+                                           .flowLinkCredit(UnsignedInteger.ONE)
+                                           .flowDrain(Boolean.TRUE)
+                                           .flowEcho(Boolean.FALSE)
+                                           .flowHandleFromLinkHandle()
+                                           .flow()
+                                           .consumeResponse().getLatestResponse(Flow.class);
+
+            assertThat(responseFlow.getHandle(), is(equalTo(remoteHandle)));
+            assertThat(responseFlow.getLinkCredit(), is(equalTo(UnsignedInteger.ZERO)));
+        }
+    }
+
+
+    @Test
+    @SpecificationTest(section = "2.6.9",
+            description = "Asynchronous notification can be accomplished as follows."
+                          + " The receiver maintains a target amount of link-credit for that link."
+                          + " As transfer arrive on the link, the sender’s link-credit decreases"
+                          + " as the delivery-count increases. When the sender’s link-credit falls below a threshold,"
+                          + " the flow state MAY be sent to increase the sender’s link-credit back"
+                          + " to the desired target amount.")
+    public void asynchronousNotification() throws Exception
+    {
+        BrokerAdmin brokerAdmin = getBrokerAdmin();
+        brokerAdmin.createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+        String messageContent1 = "Test1";
+        String messageContent2 = "Test2";
+        String messageContent3 = "Test2";
+        brokerAdmin.putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, messageContent1);
+        brokerAdmin.putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, messageContent2);
+        brokerAdmin.putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, messageContent3);
+
+        final InetSocketAddress addr = brokerAdmin.getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        try (FrameTransport transport = new FrameTransport(addr).connect())
+        {
+            Interaction interaction = transport.newInteraction()
+                                               .negotiateProtocol().consumeResponse()
+                                               .open().consumeResponse(Open.class)
+                                               .begin().consumeResponse(Begin.class)
+                                               .attachRole(Role.RECEIVER)
+                                               .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                               .attach().consumeResponse(Attach.class);
+
+            Attach remoteAttach = interaction.getLatestResponse(Attach.class);
+            UnsignedInteger remoteHandle = remoteAttach.getHandle();
+            assertThat(remoteHandle, is(notNullValue()));
+
+            UnsignedInteger delta = UnsignedInteger.ONE;
+            UnsignedInteger incomingWindow = UnsignedInteger.valueOf(3);
+            Object receivedMessageContent1 = interaction.flowIncomingWindow(incomingWindow)
+                                                        .flowNextIncomingId(UnsignedInteger.ZERO)
+                                                        .flowLinkCredit(delta)
+                                                        .flowHandleFromLinkHandle()
+                                                        .flow()
+                                                        .receiveDelivery()
+                                                        .decodeLatestDelivery()
+                                                        .getDecodedLatestDelivery();
+
+            assertThat(receivedMessageContent1, is(equalTo(messageContent1)));
+            assertThat(interaction.getLatestDeliveryId(), is(equalTo(UnsignedInteger.ZERO)));
+
+            Object receivedMessageContent2 = interaction.flowIncomingWindow(incomingWindow)
+                                                        .flowNextIncomingId(UnsignedInteger.ONE)
+                                                        .flowLinkCredit(delta)
+                                                        .flowHandleFromLinkHandle()
+                                                        .flow()
+                                                        .receiveDelivery()
+                                                        .decodeLatestDelivery()
+                                                        .getDecodedLatestDelivery();
+
+            assertThat(receivedMessageContent2, is(equalTo(messageContent2)));
+            assertThat(interaction.getLatestDeliveryId(), is(equalTo(UnsignedInteger.ONE)));
+
+            // send session flow with echo=true to verify that no message is delivered without issuing a credit
+            Flow responseFlow = interaction.flowNextIncomingId(UnsignedInteger.valueOf(2))
+                                           .flowLinkCredit(null)
+                                           .flowHandle(null)
+                                           .flowEcho(Boolean.TRUE)
+                                           .flow()
+                                           .consumeResponse().getLatestResponse(Flow.class);
+
+            assertThat(responseFlow.getHandle(), is(nullValue()));
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "2.6.10",
+            description = "Stopping the transfers on a given link is accomplished by updating the link-credit"
+                          + " to be zero and sending the updated flow state. [...]"
+                          + " The echo field of the flow frame MAY be used to request the sender’s flow state"
+                          + " be echoed back. This MAY be used to determine when the link has finally quiesced.")
+    public void stoppingALink() throws Exception
+    {
+        BrokerAdmin brokerAdmin = getBrokerAdmin();
+        brokerAdmin.createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+        String messageContent1 = "Test1";
+        String messageContent2 = "Test2";
+        brokerAdmin.putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, messageContent1);
+        brokerAdmin.putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, messageContent2);
+
+        final InetSocketAddress addr = brokerAdmin.getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        try (FrameTransport transport = new FrameTransport(addr).connect())
+        {
+            Interaction interaction = transport.newInteraction()
+                                               .negotiateProtocol().consumeResponse()
+                                               .open().consumeResponse(Open.class)
+                                               .begin().consumeResponse(Begin.class)
+                                               .attachRole(Role.RECEIVER)
+                                               .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                               .attach().consumeResponse(Attach.class);
+
+            Attach remoteAttach = interaction.getLatestResponse(Attach.class);
+            UnsignedInteger remoteHandle = remoteAttach.getHandle();
+            assertThat(remoteHandle, is(notNullValue()));
+
+            Object receivedMessageContent1 = interaction.flowIncomingWindow(UnsignedInteger.valueOf(2))
+                                                        .flowNextIncomingId(UnsignedInteger.ZERO)
+                                                        .flowLinkCredit(UnsignedInteger.ONE)
+                                                        .flowHandleFromLinkHandle()
+                                                        .flow()
+                                                        .receiveDelivery()
+                                                        .decodeLatestDelivery()
+                                                        .getDecodedLatestDelivery();
+
+            assertThat(receivedMessageContent1, is(equalTo(messageContent1)));
+            assertThat(interaction.getLatestDeliveryId(), is(equalTo(UnsignedInteger.ZERO)));
+
+            Flow responseFlow = interaction.flowNextIncomingId(UnsignedInteger.ONE)
+                                           .flowLinkCredit(UnsignedInteger.ZERO)
+                                           .flowHandleFromLinkHandle()
+                                           .flowEcho(Boolean.TRUE)
+                                           .flow()
+                                           .consumeResponse().getLatestResponse(Flow.class);
+
+            assertThat(responseFlow.getHandle(), is(equalTo(remoteHandle)));
+            assertThat(responseFlow.getLinkCredit(), is(equalTo(UnsignedInteger.ZERO)));
+        }
+    }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org