You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/08/28 06:38:05 UTC
[qpid-broker-j] 01/08: QPID-8349: [Tests][AMQP 1.0] Add ability
into ExistingQueueAdmin to drain queues using 'settlled' or 'unsettled'
sender setlle modes
This is an automated email from the ASF dual-hosted git repository.
orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
commit 757ba6d6a1ae76eb6d1df0db370d77c7112e1085
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Tue Aug 27 16:28:41 2019 +0100
QPID-8349: [Tests][AMQP 1.0] Add ability into ExistingQueueAdmin to drain queues using 'settlled' or 'unsettled' sender setlle modes
---
.../tests/protocol/v1_0/ExistingQueueAdmin.java | 70 ++++++++++------------
1 file changed, 32 insertions(+), 38 deletions(-)
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExistingQueueAdmin.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExistingQueueAdmin.java
index d789b1a..648c03c 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExistingQueueAdmin.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExistingQueueAdmin.java
@@ -29,14 +29,14 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-import org.apache.qpid.tests.protocol.Response;
+import org.apache.qpid.server.util.StringUtil;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminException;
import org.apache.qpid.tests.utils.QueueAdmin;
@@ -47,6 +47,8 @@ public class ExistingQueueAdmin implements QueueAdmin
private static final Logger LOGGER = LoggerFactory.getLogger(ExistingQueueAdmin.class);
private static final String ADMIN_LINK_NAME = "existingQueueAdminLink";
private static final int DRAIN_CREDITS = 1000;
+ private static final boolean DRAIN_UNSETTLED =
+ Boolean.getBoolean("qpid.tests.protocol.broker.external.existingQueueAdmin.drainUnsettled");
@Override
public void createQueue(final BrokerAdmin brokerAdmin, final String queueName)
@@ -145,6 +147,8 @@ public class ExistingQueueAdmin implements QueueAdmin
private void drainQueue(final InetSocketAddress brokerAddress, final String queueName) throws Exception
{
+ final String controlMessage = String.format("---%s---", new StringUtil().randomAlphaNumericString(32));
+ putMessageOnQueue(brokerAddress, queueName, controlMessage);
try (FrameTransport transport = new FrameTransport(brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction();
@@ -153,55 +157,45 @@ public class ExistingQueueAdmin implements QueueAdmin
.begin().consumeResponse()
.attachName(ADMIN_LINK_NAME)
.attachRole(Role.RECEIVER)
- .attachSndSettleMode(SenderSettleMode.SETTLED)
+ .attachSndSettleMode(DRAIN_UNSETTLED ? SenderSettleMode.UNSETTLED : SenderSettleMode.SETTLED)
.attachSourceAddress(queueName)
.attach().consumeResponse(Attach.class)
- .flowIncomingWindow(UnsignedInteger.MAX_VALUE)
- .flowNextIncomingId(interaction.getCachedResponse(Begin.class).getNextOutgoingId())
+ .flowIncomingWindow(UnsignedInteger.valueOf(DRAIN_CREDITS))
+ .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
.flowLinkCredit(UnsignedInteger.valueOf(DRAIN_CREDITS))
.flowHandleFromLinkHandle()
.flowOutgoingWindow(UnsignedInteger.ZERO)
.flowNextOutgoingId(UnsignedInteger.ZERO)
- .flowDrain(Boolean.TRUE)
.flow();
- boolean received;
+
+ boolean controlMessageReceived;
do
{
- received = receive(interaction, queueName);
- }
- while (received);
- closeInteraction(interaction);
- }
- }
+ interaction.receiveDelivery(Flow.class);
+ try
+ {
+ interaction.decodeLatestDelivery();
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Message decoding failed", e);
+ }
- private boolean receive(final Interaction interaction, String queueName) throws Exception
- {
- boolean transferExpected;
- boolean messageReceived = false;
- do
- {
- final Response<?> latestResponse =
- interaction.consumeResponse(Transfer.class, Flow.class, null).getLatestResponse();
- if (latestResponse != null && latestResponse.getBody() instanceof Transfer)
- {
- Transfer responseTransfer = (Transfer) latestResponse.getBody();
- transferExpected = Boolean.TRUE.equals(responseTransfer.getMore());
- if (!transferExpected)
+ final Object message = interaction.getDecodedLatestDelivery();
+ if (DRAIN_UNSETTLED)
{
- messageReceived = true;
+ interaction.dispositionSettled(true)
+ .dispositionRole(Role.RECEIVER)
+ .dispositionFirstFromLatestDelivery()
+ .dispositionState(new Accepted())
+ .disposition();
}
+
+ controlMessageReceived = controlMessage.equals(message);
}
- else if (latestResponse != null && latestResponse.getBody() instanceof Flow)
- {
- transferExpected = false;
- }
- else
- {
- LOGGER.warn("Neither transfer no flow was received from '{}'. Assuming no messages left...", queueName);
- transferExpected = false;
- }
+ while (!controlMessageReceived);
+ closeInteraction(interaction);
}
- while (transferExpected);
- return messageReceived;
}
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org