You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2018/01/26 17:14:04 UTC
[2/2] qpid-broker-j git commit: QPID-8085: [Broker-J][AMQP 1.0]
Optimize the sending of flow performatives from broker sending link endpoint
QPID-8085: [Broker-J][AMQP 1.0] Optimize the sending of flow performatives from broker sending link endpoint
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/c49dc0f4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/c49dc0f4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/c49dc0f4
Branch: refs/heads/master
Commit: c49dc0f473c6e727caa1143cb155774607bc23a2
Parents: 557fe9c
Author: Alex Rudyy <or...@apache.org>
Authored: Fri Jan 26 16:55:54 2018 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Fri Jan 26 17:10:31 2018 +0000
----------------------------------------------------------------------
.../protocol/v1_0/ConsumerTarget_1_0.java | 1 -
.../protocol/v1_0/messaging/TransferTest.java | 3 +-
.../protocol/v1_0/transport/link/FlowTest.java | 238 ++++++++++++++++++-
3 files changed, 239 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c49dc0f4/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index 4798815..ebcaa60 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -470,7 +470,6 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
{
_linkEndpoint.updateDisposition(_deliveryTag, outcome, true);
}
- _linkEndpoint.sendFlowConditional();
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c49dc0f4/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index 68d7179..28ab7ea 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -1020,7 +1020,8 @@ public class TransferTest extends BrokerAdminUsingTestBase
.disposition();
// make sure sure the disposition is handled by making drain request
- interaction.flowLinkCredit(UnsignedInteger.ZERO)
+ interaction.flowLinkCredit(UnsignedInteger.ONE)
+ .flowNextIncomingId(UnsignedInteger.valueOf(numberOfMessages))
.flowDrain(Boolean.TRUE)
.flow()
.consumeResponse(Flow.class);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c49dc0f4/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
index 1a38cf0..75f4d14 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
@@ -41,8 +41,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.protocol.v1_0.type.transport.SessionError;
-import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.protocol.v1_0.Utils;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
@@ -195,4 +196,239 @@ public class FlowTest extends BrokerAdminUsingTestBase
assertThat(responseEnd.getError().getCondition(), is(equalTo(SessionError.UNATTACHED_HANDLE)));
}
}
+
+ @Test
+ @SpecificationTest(section = "2.6.8",
+ description = "Synchronous get with a timeout is accomplished by incrementing the link-credit,"
+ + " sending the updated flow state and waiting for the link-credit to be consumed."
+ + " When the desired time has elapsed the receiver then sets the drain flag and sends"
+ + " the newly updated flow state again, while continuing to wait for the link-credit"
+ + " to be consumed.")
+ public void synchronousGetWithTimeoutEmptyQueue() throws Exception
+ {
+ getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+ final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ try (FrameTransport transport = new FrameTransport(addr).connect())
+ {
+ Interaction interaction = transport.newInteraction()
+ .negotiateProtocol().consumeResponse()
+ .open().consumeResponse(Open.class)
+ .begin().consumeResponse(Begin.class)
+ .attachRole(Role.RECEIVER)
+ .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attach().consumeResponse(Attach.class);
+
+ Attach remoteAttach = interaction.getLatestResponse(Attach.class);
+ UnsignedInteger remoteHandle = remoteAttach.getHandle();
+ assertThat(remoteHandle, is(notNullValue()));
+
+ Flow responseFlow = interaction.flowIncomingWindow(UnsignedInteger.valueOf(1))
+ .flowNextIncomingId(UnsignedInteger.ZERO)
+ .flowLinkCredit(UnsignedInteger.ONE)
+ .flowDrain(Boolean.FALSE)
+ .flowEcho(Boolean.TRUE)
+ .flowHandleFromLinkHandle()
+ .flow()
+ .consumeResponse().getLatestResponse(Flow.class);
+
+ assertThat(responseFlow.getHandle(), is(equalTo(remoteHandle)));
+ assertThat(responseFlow.getLinkCredit(), is(equalTo(UnsignedInteger.ONE)));
+ assertThat(responseFlow.getDrain(), is(equalTo(Boolean.FALSE)));
+
+ responseFlow = interaction.flowLinkCredit(UnsignedInteger.ONE)
+ .flowDrain(Boolean.TRUE)
+ .flowEcho(Boolean.FALSE)
+ .flowHandleFromLinkHandle()
+ .flow()
+ .consumeResponse().getLatestResponse(Flow.class);
+
+ assertThat(responseFlow.getHandle(), is(equalTo(remoteHandle)));
+ assertThat(responseFlow.getLinkCredit(), is(equalTo(UnsignedInteger.ZERO)));
+ }
+ }
+
+
+ @Test
+ @SpecificationTest(section = "2.6.8",
+ description = "Synchronous get with a timeout is accomplished by incrementing the link-credit,"
+ + " sending the updated flow state and waiting for the link-credit to be consumed."
+ + " When the desired time has elapsed the receiver then sets the drain flag and sends"
+ + " the newly updated flow state again, while continuing to wait for the link-credit"
+ + " to be consumed.")
+ public void synchronousGetWithTimeoutNonEmptyQueue() throws Exception
+ {
+ BrokerAdmin brokerAdmin = getBrokerAdmin();
+ brokerAdmin.createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+ String messageContent = "Test";
+ brokerAdmin.putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, messageContent);
+
+ final InetSocketAddress addr = brokerAdmin.getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ try (FrameTransport transport = new FrameTransport(addr).connect())
+ {
+ Interaction interaction = transport.newInteraction()
+ .negotiateProtocol().consumeResponse()
+ .open().consumeResponse(Open.class)
+ .begin().consumeResponse(Begin.class)
+ .attachRole(Role.RECEIVER)
+ .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attach().consumeResponse(Attach.class);
+
+ Attach remoteAttach = interaction.getLatestResponse(Attach.class);
+ UnsignedInteger remoteHandle = remoteAttach.getHandle();
+ assertThat(remoteHandle, is(notNullValue()));
+
+ Object receivedMessageContent = interaction.flowIncomingWindow(UnsignedInteger.valueOf(1))
+ .flowNextIncomingId(UnsignedInteger.ZERO)
+ .flowLinkCredit(UnsignedInteger.ONE)
+ .flowDrain(Boolean.FALSE)
+ .flowEcho(Boolean.FALSE)
+ .flowHandleFromLinkHandle()
+ .flow()
+ .receiveDelivery()
+ .decodeLatestDelivery()
+ .getDecodedLatestDelivery();
+
+ assertThat(receivedMessageContent, is(equalTo(messageContent)));
+ assertThat(interaction.getLatestDeliveryId(), is(equalTo(UnsignedInteger.ZERO)));
+
+ Flow responseFlow = interaction.flowNextIncomingId(UnsignedInteger.ONE)
+ .flowLinkCredit(UnsignedInteger.ONE)
+ .flowDrain(Boolean.TRUE)
+ .flowEcho(Boolean.FALSE)
+ .flowHandleFromLinkHandle()
+ .flow()
+ .consumeResponse().getLatestResponse(Flow.class);
+
+ assertThat(responseFlow.getHandle(), is(equalTo(remoteHandle)));
+ assertThat(responseFlow.getLinkCredit(), is(equalTo(UnsignedInteger.ZERO)));
+ }
+ }
+
+
+ @Test
+ @SpecificationTest(section = "2.6.9",
+ description = "Asynchronous notification can be accomplished as follows."
+ + " The receiver maintains a target amount of link-credit for that link."
+ + " As transfer arrive on the link, the sender’s link-credit decreases"
+ + " as the delivery-count increases. When the sender’s link-credit falls below a threshold,"
+ + " the flow state MAY be sent to increase the sender’s link-credit back"
+ + " to the desired target amount.")
+ public void asynchronousNotification() throws Exception
+ {
+ BrokerAdmin brokerAdmin = getBrokerAdmin();
+ brokerAdmin.createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+ String messageContent1 = "Test1";
+ String messageContent2 = "Test2";
+ String messageContent3 = "Test2";
+ brokerAdmin.putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, messageContent1);
+ brokerAdmin.putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, messageContent2);
+ brokerAdmin.putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, messageContent3);
+
+ final InetSocketAddress addr = brokerAdmin.getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ try (FrameTransport transport = new FrameTransport(addr).connect())
+ {
+ Interaction interaction = transport.newInteraction()
+ .negotiateProtocol().consumeResponse()
+ .open().consumeResponse(Open.class)
+ .begin().consumeResponse(Begin.class)
+ .attachRole(Role.RECEIVER)
+ .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attach().consumeResponse(Attach.class);
+
+ Attach remoteAttach = interaction.getLatestResponse(Attach.class);
+ UnsignedInteger remoteHandle = remoteAttach.getHandle();
+ assertThat(remoteHandle, is(notNullValue()));
+
+ UnsignedInteger delta = UnsignedInteger.ONE;
+ UnsignedInteger incomingWindow = UnsignedInteger.valueOf(3);
+ Object receivedMessageContent1 = interaction.flowIncomingWindow(incomingWindow)
+ .flowNextIncomingId(UnsignedInteger.ZERO)
+ .flowLinkCredit(delta)
+ .flowHandleFromLinkHandle()
+ .flow()
+ .receiveDelivery()
+ .decodeLatestDelivery()
+ .getDecodedLatestDelivery();
+
+ assertThat(receivedMessageContent1, is(equalTo(messageContent1)));
+ assertThat(interaction.getLatestDeliveryId(), is(equalTo(UnsignedInteger.ZERO)));
+
+ Object receivedMessageContent2 = interaction.flowIncomingWindow(incomingWindow)
+ .flowNextIncomingId(UnsignedInteger.ONE)
+ .flowLinkCredit(delta)
+ .flowHandleFromLinkHandle()
+ .flow()
+ .receiveDelivery()
+ .decodeLatestDelivery()
+ .getDecodedLatestDelivery();
+
+ assertThat(receivedMessageContent2, is(equalTo(messageContent2)));
+ assertThat(interaction.getLatestDeliveryId(), is(equalTo(UnsignedInteger.ONE)));
+
+ // send session flow with echo=true to verify that no message is delivered without issuing a credit
+ Flow responseFlow = interaction.flowNextIncomingId(UnsignedInteger.valueOf(2))
+ .flowLinkCredit(null)
+ .flowHandle(null)
+ .flowEcho(Boolean.TRUE)
+ .flow()
+ .consumeResponse().getLatestResponse(Flow.class);
+
+ assertThat(responseFlow.getHandle(), is(nullValue()));
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "2.6.10",
+ description = "Stopping the transfers on a given link is accomplished by updating the link-credit"
+ + " to be zero and sending the updated flow state. [...]"
+ + " The echo field of the flow frame MAY be used to request the sender’s flow state"
+ + " be echoed back. This MAY be used to determine when the link has finally quiesced.")
+ public void stoppingALink() throws Exception
+ {
+ BrokerAdmin brokerAdmin = getBrokerAdmin();
+ brokerAdmin.createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+ String messageContent1 = "Test1";
+ String messageContent2 = "Test2";
+ brokerAdmin.putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, messageContent1);
+ brokerAdmin.putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, messageContent2);
+
+ final InetSocketAddress addr = brokerAdmin.getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ try (FrameTransport transport = new FrameTransport(addr).connect())
+ {
+ Interaction interaction = transport.newInteraction()
+ .negotiateProtocol().consumeResponse()
+ .open().consumeResponse(Open.class)
+ .begin().consumeResponse(Begin.class)
+ .attachRole(Role.RECEIVER)
+ .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attach().consumeResponse(Attach.class);
+
+ Attach remoteAttach = interaction.getLatestResponse(Attach.class);
+ UnsignedInteger remoteHandle = remoteAttach.getHandle();
+ assertThat(remoteHandle, is(notNullValue()));
+
+ Object receivedMessageContent1 = interaction.flowIncomingWindow(UnsignedInteger.valueOf(2))
+ .flowNextIncomingId(UnsignedInteger.ZERO)
+ .flowLinkCredit(UnsignedInteger.ONE)
+ .flowHandleFromLinkHandle()
+ .flow()
+ .receiveDelivery()
+ .decodeLatestDelivery()
+ .getDecodedLatestDelivery();
+
+ assertThat(receivedMessageContent1, is(equalTo(messageContent1)));
+ assertThat(interaction.getLatestDeliveryId(), is(equalTo(UnsignedInteger.ZERO)));
+
+ Flow responseFlow = interaction.flowNextIncomingId(UnsignedInteger.ONE)
+ .flowLinkCredit(UnsignedInteger.ZERO)
+ .flowHandleFromLinkHandle()
+ .flowEcho(Boolean.TRUE)
+ .flow()
+ .consumeResponse().getLatestResponse(Flow.class);
+
+ assertThat(responseFlow.getHandle(), is(equalTo(remoteHandle)));
+ assertThat(responseFlow.getLinkCredit(), is(equalTo(UnsignedInteger.ZERO)));
+ }
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org