You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/08/21 16:35:44 UTC

[qpid-broker-j] 02/21: QPID-8349: [Tests][AMQP 1.0] Introduce field to count incoming deliveries in order to set flow next-incoming-id based on received message count and begin next-outgoing-id

This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit 80fc0b80617f06a8119c38cd9057375527eee964
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Mon Aug 19 16:30:35 2019 +0100

    QPID-8349: [Tests][AMQP 1.0] Introduce field to count incoming deliveries in order to set flow next-incoming-id based on received message count and begin next-outgoing-id
---
 .../qpid/tests/protocol/v1_0/Interaction.java      | 29 ++++++++++++++++++++++
 .../qpid/tests/protocol/AbstractInteraction.java   |  4 +--
 2 files changed, 31 insertions(+), 2 deletions(-)

diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
index ac30068..1437e19 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -30,12 +30,14 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -113,6 +115,8 @@ public class Interaction extends AbstractInteraction<Interaction>
     private Object _decodedLatestDelivery;
     private UnsignedInteger _latestDeliveryId;
     private Map<String, Object> _latestDeliveryApplicationProperties;
+    private Map<Class, FrameBody> _latestResponses = new HashMap<>();
+    private AtomicLong _receivedDeliveryCount = new AtomicLong();
 
     Interaction(final FrameTransport frameTransport)
     {
@@ -644,6 +648,12 @@ public class Interaction extends AbstractInteraction<Interaction>
         return flowNextIncomingId(_latestDeliveryId.add(UnsignedInteger.ONE));
     }
 
+    public Interaction flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
+    {
+        final Begin begin = (Begin) _latestResponses.get(Begin.class);
+        return flowNextIncomingId(begin.getNextOutgoingId().add(UnsignedInteger.valueOf(_receivedDeliveryCount.get())));
+    }
+
     public Interaction flowOutgoingWindow(final UnsignedInteger outgoingWindow)
     {
         _flow.setOutgoingWindow(outgoingWindow);
@@ -680,6 +690,12 @@ public class Interaction extends AbstractInteraction<Interaction>
         return this;
     }
 
+    public Interaction flowDeliveryCount()
+    {
+        _flow.setDeliveryCount(UnsignedInteger.valueOf(_receivedDeliveryCount.get()));
+        return this;
+    }
+
     public Interaction flowLinkCredit(final UnsignedInteger linkCredit)
     {
         _flow.setLinkCredit(linkCredit);
@@ -1071,6 +1087,7 @@ public class Interaction extends AbstractInteraction<Interaction>
         sync();
         _latestDelivery = receiveAllTransfers(ignore);
         _latestDeliveryId = _latestDelivery.size() > 0 ? _latestDelivery.get(0).getDeliveryId() : null;
+        _receivedDeliveryCount.incrementAndGet();
         return this;
     }
 
@@ -1163,4 +1180,16 @@ public class Interaction extends AbstractInteraction<Interaction>
         while (completed == null);
         return completed;
     }
+
+    @Override
+    protected Response<?> getNextResponse() throws Exception
+    {
+        Response<?> response = super.getNextResponse();
+        if (response != null && response.getBody() instanceof FrameBody)
+        {
+            _latestResponses.put(response.getBody().getClass(), (FrameBody)response.getBody());
+        }
+        return response;
+    }
+
 }
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractInteraction.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractInteraction.java
index 2c977f3..f9640dd 100644
--- a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractInteraction.java
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractInteraction.java
@@ -84,12 +84,12 @@ public abstract class AbstractInteraction<I extends AbstractInteraction<I>>
         return getInteraction();
     }
 
-    public Response<?> getLatestResponse() throws Exception
+    public Response<?> getLatestResponse()
     {
         return _latestResponse;
     }
 
-    public <T> T getLatestResponse(Class<T> type) throws Exception
+    public <T> T getLatestResponse(Class<T> type)
     {
         if (_latestResponse.getBody() == null)
         {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org