You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/08/23 13:04:44 UTC
[qpid-broker-j] 01/05: QPID-8349: [Tests][AMQP 1.0] Fix
ExistingQueueAdmin
This is an automated email from the ASF dual-hosted git repository.
orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
commit af83b6db0463d095f626592e15a9e4f5297281f7
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Thu Aug 22 13:01:42 2019 +0100
QPID-8349: [Tests][AMQP 1.0] Fix ExistingQueueAdmin
---
.../tests/protocol/v1_0/ExistingQueueAdmin.java | 99 +++++++++-------------
1 file changed, 41 insertions(+), 58 deletions(-)
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExistingQueueAdmin.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExistingQueueAdmin.java
index 313c4ff..d789b1a 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExistingQueueAdmin.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExistingQueueAdmin.java
@@ -31,8 +31,6 @@ import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
-import org.apache.qpid.server.protocol.v1_0.type.transport.End;
import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
@@ -48,6 +46,7 @@ public class ExistingQueueAdmin implements QueueAdmin
{
private static final Logger LOGGER = LoggerFactory.getLogger(ExistingQueueAdmin.class);
private static final String ADMIN_LINK_NAME = "existingQueueAdminLink";
+ private static final int DRAIN_CREDITS = 1000;
@Override
public void createQueue(final BrokerAdmin brokerAdmin, final String queueName)
@@ -58,14 +57,7 @@ public class ExistingQueueAdmin implements QueueAdmin
@Override
public void deleteQueue(final BrokerAdmin brokerAdmin, final String queueName)
{
- try
- {
- drainQueue(brokerAdmin.getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP), queueName);
- }
- catch (Exception e)
- {
- throw new BrokerAdminException(String.format("Cannot drain queue '%s'", queueName), e);
- }
+ drain(queueName, brokerAdmin.getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP));
}
@Override
@@ -96,6 +88,18 @@ public class ExistingQueueAdmin implements QueueAdmin
return true;
}
+ private void drain(final String queueName, final InetSocketAddress brokerAddress)
+ {
+ try
+ {
+ drainQueue(brokerAddress, queueName);
+ }
+ catch (Exception e)
+ {
+ throw new BrokerAdminException(String.format("Cannot drain queue '%s'", queueName), e);
+ }
+ }
+
private void putMessageOnQueue(final InetSocketAddress brokerAddress,
final String queueName,
final String... message) throws Exception
@@ -133,10 +137,9 @@ public class ExistingQueueAdmin implements QueueAdmin
{
interaction.detachClose(true)
.detach()
- .consumeResponse(Detach.class)
.end()
- .consumeResponse(End.class)
- .doCloseConnection();
+ .close()
+ .sync();
}
@@ -152,73 +155,53 @@ public class ExistingQueueAdmin implements QueueAdmin
.attachRole(Role.RECEIVER)
.attachSndSettleMode(SenderSettleMode.SETTLED)
.attachSourceAddress(queueName)
- .attach().consumeResponse();
-
+ .attach().consumeResponse(Attach.class)
+ .flowIncomingWindow(UnsignedInteger.MAX_VALUE)
+ .flowNextIncomingId(interaction.getCachedResponse(Begin.class).getNextOutgoingId())
+ .flowLinkCredit(UnsignedInteger.valueOf(DRAIN_CREDITS))
+ .flowHandleFromLinkHandle()
+ .flowOutgoingWindow(UnsignedInteger.ZERO)
+ .flowNextOutgoingId(UnsignedInteger.ZERO)
+ .flowDrain(Boolean.TRUE)
+ .flow();
boolean received;
- final Begin begin = interaction.getCachedResponse(Begin.class);
- int nextIncomingId = begin.getNextOutgoingId().intValue();
do
{
- received = receive(interaction, queueName, nextIncomingId);
- nextIncomingId++;
+ received = receive(interaction, queueName);
}
while (received);
closeInteraction(interaction);
}
}
- private boolean receive(final Interaction interaction, String queueName, int nextIncomingId) throws Exception
+ private boolean receive(final Interaction interaction, String queueName) throws Exception
{
- interaction.flowIncomingWindow(UnsignedInteger.MAX_VALUE)
- .flowNextIncomingId(UnsignedInteger.valueOf(nextIncomingId))
- .flowLinkCredit(UnsignedInteger.ONE)
- .flowDrain(Boolean.TRUE)
- .flowHandleFromLinkHandle()
- .flowOutgoingWindow(UnsignedInteger.ZERO)
- .flowNextOutgoingId(UnsignedInteger.ZERO)
- .flow();
-
+ boolean transferExpected;
boolean messageReceived = false;
- boolean flowReceived = false;
do
{
- Response<?> latestResponse;
- try
- {
- latestResponse = interaction.consumeResponse(Transfer.class, Flow.class).getLatestResponse();
- }
- catch (IllegalStateException e)
- {
- if (messageReceived)
- {
- LOGGER.debug(
- "Message was received on draining queue '{}' but flow was not. Assuming successful receive...",
- queueName,
- e);
- }
- else
- {
- LOGGER.warn(
- "Neither message no flow was received on draining queue '{}'. Assuming no messages on the queue...",
- queueName,
- e);
- }
- return messageReceived;
- }
- if (latestResponse.getBody() instanceof Transfer)
+ final Response<?> latestResponse =
+ interaction.consumeResponse(Transfer.class, Flow.class, null).getLatestResponse();
+ if (latestResponse != null && latestResponse.getBody() instanceof Transfer)
{
Transfer responseTransfer = (Transfer) latestResponse.getBody();
- if (!Boolean.TRUE.equals(responseTransfer.getMore()))
+ transferExpected = Boolean.TRUE.equals(responseTransfer.getMore());
+ if (!transferExpected)
{
messageReceived = true;
}
}
- else if (latestResponse.getBody() instanceof Flow)
+ else if (latestResponse != null && latestResponse.getBody() instanceof Flow)
+ {
+ transferExpected = false;
+ }
+ else
{
- flowReceived = true;
+ LOGGER.warn("Neither transfer no flow was received from '{}'. Assuming no messages left...", queueName);
+ transferExpected = false;
}
}
- while (!flowReceived);
+ while (transferExpected);
return messageReceived;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org