You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/08/23 13:04:44 UTC

[qpid-broker-j] 01/05: QPID-8349: [Tests][AMQP 1.0] Fix ExistingQueueAdmin

This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit af83b6db0463d095f626592e15a9e4f5297281f7
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Thu Aug 22 13:01:42 2019 +0100

    QPID-8349: [Tests][AMQP 1.0] Fix ExistingQueueAdmin
---
 .../tests/protocol/v1_0/ExistingQueueAdmin.java    | 99 +++++++++-------------
 1 file changed, 41 insertions(+), 58 deletions(-)

diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExistingQueueAdmin.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExistingQueueAdmin.java
index 313c4ff..d789b1a 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExistingQueueAdmin.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExistingQueueAdmin.java
@@ -31,8 +31,6 @@ import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
-import org.apache.qpid.server.protocol.v1_0.type.transport.End;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
@@ -48,6 +46,7 @@ public class ExistingQueueAdmin implements QueueAdmin
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(ExistingQueueAdmin.class);
     private static final String ADMIN_LINK_NAME = "existingQueueAdminLink";
+    private static final int DRAIN_CREDITS = 1000;
 
     @Override
     public void createQueue(final BrokerAdmin brokerAdmin, final String queueName)
@@ -58,14 +57,7 @@ public class ExistingQueueAdmin implements QueueAdmin
     @Override
     public void deleteQueue(final BrokerAdmin brokerAdmin, final String queueName)
     {
-        try
-        {
-            drainQueue(brokerAdmin.getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP), queueName);
-        }
-        catch (Exception e)
-        {
-            throw new BrokerAdminException(String.format("Cannot drain queue '%s'", queueName), e);
-        }
+        drain(queueName, brokerAdmin.getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP));
     }
 
     @Override
@@ -96,6 +88,18 @@ public class ExistingQueueAdmin implements QueueAdmin
         return true;
     }
 
+    private void drain(final String queueName, final InetSocketAddress brokerAddress)
+    {
+        try
+        {
+            drainQueue(brokerAddress, queueName);
+        }
+        catch (Exception e)
+        {
+            throw new BrokerAdminException(String.format("Cannot drain queue '%s'", queueName), e);
+        }
+    }
+
     private void putMessageOnQueue(final InetSocketAddress brokerAddress,
                                    final String queueName,
                                    final String... message) throws Exception
@@ -133,10 +137,9 @@ public class ExistingQueueAdmin implements QueueAdmin
     {
         interaction.detachClose(true)
                    .detach()
-                   .consumeResponse(Detach.class)
                    .end()
-                   .consumeResponse(End.class)
-                   .doCloseConnection();
+                   .close()
+                   .sync();
     }
 
 
@@ -152,73 +155,53 @@ public class ExistingQueueAdmin implements QueueAdmin
                        .attachRole(Role.RECEIVER)
                        .attachSndSettleMode(SenderSettleMode.SETTLED)
                        .attachSourceAddress(queueName)
-                       .attach().consumeResponse();
-
+                       .attach().consumeResponse(Attach.class)
+                       .flowIncomingWindow(UnsignedInteger.MAX_VALUE)
+                       .flowNextIncomingId(interaction.getCachedResponse(Begin.class).getNextOutgoingId())
+                       .flowLinkCredit(UnsignedInteger.valueOf(DRAIN_CREDITS))
+                       .flowHandleFromLinkHandle()
+                       .flowOutgoingWindow(UnsignedInteger.ZERO)
+                       .flowNextOutgoingId(UnsignedInteger.ZERO)
+                       .flowDrain(Boolean.TRUE)
+                       .flow();
             boolean received;
-            final Begin begin = interaction.getCachedResponse(Begin.class);
-            int nextIncomingId = begin.getNextOutgoingId().intValue();
             do
             {
-                received = receive(interaction, queueName, nextIncomingId);
-                nextIncomingId++;
+                received = receive(interaction, queueName);
             }
             while (received);
             closeInteraction(interaction);
         }
     }
 
-    private boolean receive(final Interaction interaction, String queueName, int nextIncomingId) throws Exception
+    private boolean receive(final Interaction interaction, String queueName) throws Exception
     {
-        interaction.flowIncomingWindow(UnsignedInteger.MAX_VALUE)
-                   .flowNextIncomingId(UnsignedInteger.valueOf(nextIncomingId))
-                   .flowLinkCredit(UnsignedInteger.ONE)
-                   .flowDrain(Boolean.TRUE)
-                   .flowHandleFromLinkHandle()
-                   .flowOutgoingWindow(UnsignedInteger.ZERO)
-                   .flowNextOutgoingId(UnsignedInteger.ZERO)
-                   .flow();
-
+        boolean transferExpected;
         boolean messageReceived = false;
-        boolean flowReceived = false;
         do
         {
-            Response<?> latestResponse;
-            try
-            {
-                latestResponse = interaction.consumeResponse(Transfer.class, Flow.class).getLatestResponse();
-            }
-            catch (IllegalStateException e)
-            {
-                if (messageReceived)
-                {
-                    LOGGER.debug(
-                            "Message was received on draining queue '{}' but flow was not. Assuming successful receive...",
-                            queueName,
-                            e);
-                }
-                else
-                {
-                    LOGGER.warn(
-                            "Neither message no flow was received on draining queue '{}'.  Assuming no messages on the queue...",
-                            queueName,
-                            e);
-                }
-                return messageReceived;
-            }
-            if (latestResponse.getBody() instanceof Transfer)
+            final Response<?> latestResponse =
+                    interaction.consumeResponse(Transfer.class, Flow.class, null).getLatestResponse();
+            if (latestResponse != null && latestResponse.getBody() instanceof Transfer)
             {
                 Transfer responseTransfer = (Transfer) latestResponse.getBody();
-                if (!Boolean.TRUE.equals(responseTransfer.getMore()))
+                transferExpected = Boolean.TRUE.equals(responseTransfer.getMore());
+                if (!transferExpected)
                 {
                     messageReceived = true;
                 }
             }
-            else if (latestResponse.getBody() instanceof Flow)
+            else if (latestResponse != null && latestResponse.getBody() instanceof Flow)
+            {
+                transferExpected = false;
+            }
+            else
             {
-                flowReceived = true;
+                LOGGER.warn("Neither transfer no flow was received from '{}'. Assuming no messages left...", queueName);
+                transferExpected = false;
             }
         }
-        while (!flowReceived);
+        while (transferExpected);
         return messageReceived;
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org