You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/08/28 06:38:05 UTC

[qpid-broker-j] 01/08: QPID-8349: [Tests][AMQP 1.0] Add ability into ExistingQueueAdmin to drain queues using 'settlled' or 'unsettled' sender setlle modes

This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit 757ba6d6a1ae76eb6d1df0db370d77c7112e1085
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Tue Aug 27 16:28:41 2019 +0100

    QPID-8349: [Tests][AMQP 1.0] Add ability into ExistingQueueAdmin to drain queues using 'settlled' or 'unsettled' sender setlle modes
---
 .../tests/protocol/v1_0/ExistingQueueAdmin.java    | 70 ++++++++++------------
 1 file changed, 32 insertions(+), 38 deletions(-)

diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExistingQueueAdmin.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExistingQueueAdmin.java
index d789b1a..648c03c 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExistingQueueAdmin.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExistingQueueAdmin.java
@@ -29,14 +29,14 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-import org.apache.qpid.tests.protocol.Response;
+import org.apache.qpid.server.util.StringUtil;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminException;
 import org.apache.qpid.tests.utils.QueueAdmin;
@@ -47,6 +47,8 @@ public class ExistingQueueAdmin implements QueueAdmin
     private static final Logger LOGGER = LoggerFactory.getLogger(ExistingQueueAdmin.class);
     private static final String ADMIN_LINK_NAME = "existingQueueAdminLink";
     private static final int DRAIN_CREDITS = 1000;
+    private static final boolean DRAIN_UNSETTLED =
+            Boolean.getBoolean("qpid.tests.protocol.broker.external.existingQueueAdmin.drainUnsettled");
 
     @Override
     public void createQueue(final BrokerAdmin brokerAdmin, final String queueName)
@@ -145,6 +147,8 @@ public class ExistingQueueAdmin implements QueueAdmin
 
     private void drainQueue(final InetSocketAddress brokerAddress, final String queueName) throws Exception
     {
+        final String controlMessage = String.format("---%s---", new StringUtil().randomAlphaNumericString(32));
+        putMessageOnQueue(brokerAddress, queueName, controlMessage);
         try (FrameTransport transport = new FrameTransport(brokerAddress).connect())
         {
             final Interaction interaction = transport.newInteraction();
@@ -153,55 +157,45 @@ public class ExistingQueueAdmin implements QueueAdmin
                        .begin().consumeResponse()
                        .attachName(ADMIN_LINK_NAME)
                        .attachRole(Role.RECEIVER)
-                       .attachSndSettleMode(SenderSettleMode.SETTLED)
+                       .attachSndSettleMode(DRAIN_UNSETTLED ? SenderSettleMode.UNSETTLED : SenderSettleMode.SETTLED)
                        .attachSourceAddress(queueName)
                        .attach().consumeResponse(Attach.class)
-                       .flowIncomingWindow(UnsignedInteger.MAX_VALUE)
-                       .flowNextIncomingId(interaction.getCachedResponse(Begin.class).getNextOutgoingId())
+                       .flowIncomingWindow(UnsignedInteger.valueOf(DRAIN_CREDITS))
+                       .flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
                        .flowLinkCredit(UnsignedInteger.valueOf(DRAIN_CREDITS))
                        .flowHandleFromLinkHandle()
                        .flowOutgoingWindow(UnsignedInteger.ZERO)
                        .flowNextOutgoingId(UnsignedInteger.ZERO)
-                       .flowDrain(Boolean.TRUE)
                        .flow();
-            boolean received;
+
+            boolean controlMessageReceived;
             do
             {
-                received = receive(interaction, queueName);
-            }
-            while (received);
-            closeInteraction(interaction);
-        }
-    }
+                interaction.receiveDelivery(Flow.class);
+                try
+                {
+                    interaction.decodeLatestDelivery();
+                }
+                catch (Exception e)
+                {
+                    LOGGER.error("Message decoding failed", e);
+                }
 
-    private boolean receive(final Interaction interaction, String queueName) throws Exception
-    {
-        boolean transferExpected;
-        boolean messageReceived = false;
-        do
-        {
-            final Response<?> latestResponse =
-                    interaction.consumeResponse(Transfer.class, Flow.class, null).getLatestResponse();
-            if (latestResponse != null && latestResponse.getBody() instanceof Transfer)
-            {
-                Transfer responseTransfer = (Transfer) latestResponse.getBody();
-                transferExpected = Boolean.TRUE.equals(responseTransfer.getMore());
-                if (!transferExpected)
+                final Object message = interaction.getDecodedLatestDelivery();
+                if (DRAIN_UNSETTLED)
                 {
-                    messageReceived = true;
+                    interaction.dispositionSettled(true)
+                               .dispositionRole(Role.RECEIVER)
+                               .dispositionFirstFromLatestDelivery()
+                               .dispositionState(new Accepted())
+                               .disposition();
                 }
+
+                controlMessageReceived = controlMessage.equals(message);
             }
-            else if (latestResponse != null && latestResponse.getBody() instanceof Flow)
-            {
-                transferExpected = false;
-            }
-            else
-            {
-                LOGGER.warn("Neither transfer no flow was received from '{}'. Assuming no messages left...", queueName);
-                transferExpected = false;
-            }
+            while (!controlMessageReceived);
+            closeInteraction(interaction);
         }
-        while (transferExpected);
-        return messageReceived;
     }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org