You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2014/12/09 17:03:10 UTC

[7/9] qpid-jms git commit: consolidate handling of consumer flow handling and subsequent transfer and drain flow responses

consolidate handling of consumer flow handling and subsequent transfer and drain flow responses


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/b5e981c4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/b5e981c4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/b5e981c4

Branch: refs/heads/master
Commit: b5e981c49d11cc925a4d55ffe5b5d5bf8c00e5d2
Parents: 1b75b7e
Author: Robert Gemmell <ro...@apache.org>
Authored: Tue Dec 9 14:40:03 2014 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Tue Dec 9 14:40:03 2014 +0000

----------------------------------------------------------------------
 .../qpid/jms/test/testpeer/TestAmqpPeer.java    | 155 +++++++++++--------
 1 file changed, 93 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b5e981c4/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 0152cbf..cca5ab5 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -707,61 +707,7 @@ public class TestAmqpPeer implements AutoCloseable
 
     public void expectLinkFlow(boolean drain, boolean sendDrainFlowResponse, Matcher<UnsignedInteger> creditMatcher)
     {
-        Matcher<Boolean> drainMatcher = null;
-        if(drain)
-        {
-            drainMatcher = equalTo(true);
-        }
-        else
-        {
-            drainMatcher = Matchers.anyOf(equalTo(false), nullValue());
-        }
-
-        final FlowMatcher flowMatcher = new FlowMatcher()
-                        .withLinkCredit(creditMatcher)
-                        .withHandle(Matchers.notNullValue())
-                        .withDrain(drainMatcher);
-
-        if(drain && sendDrainFlowResponse)
-        {
-            final FlowFrame drainResponse = new FlowFrame();
-            drainResponse.setOutgoingWindow(UnsignedInteger.ZERO); //TODO: shouldnt be hard coded
-            drainResponse.setIncomingWindow(UnsignedInteger.valueOf(Integer.MAX_VALUE)); //TODO: shouldnt be hard coded
-            drainResponse.setLinkCredit(UnsignedInteger.ZERO);
-            drainResponse.setDrain(true);
-
-            // The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
-            final FrameSender flowResponseSender = new FrameSender(this, FrameType.AMQP, -1, drainResponse, null);
-            flowResponseSender.setValueProvider(new ValueProvider()
-            {
-                @Override
-                public void setValues()
-                {
-                    flowResponseSender.setChannel(flowMatcher.getActualChannel());
-                    drainResponse.setHandle(calculateLinkHandle(flowMatcher));
-                    drainResponse.setDeliveryCount(calculateNewDeliveryCount(flowMatcher));
-                    drainResponse.setNextOutgoingId(flowMatcher.getReceivedNextIncomingId()); // Assuming no 'in-flight' messages.
-                    drainResponse.setNextIncomingId(flowMatcher.getReceivedNextOutgoingId());
-                }
-            });
-
-            flowMatcher.onSuccess(flowResponseSender);
-        }
-
-        addHandler(flowMatcher);
-    }
-
-    private UnsignedInteger calculateLinkHandle(final FlowMatcher flowMatcher) {
-        UnsignedInteger h = (UnsignedInteger) flowMatcher.getReceivedHandle();
-
-        return h.add(UnsignedInteger.valueOf(LINK_HANDLE_OFFSET));
-    }
-
-    private UnsignedInteger calculateNewDeliveryCount(FlowMatcher flowMatcher) {
-        UnsignedInteger dc = (UnsignedInteger) flowMatcher.getReceivedDeliveryCount();
-        UnsignedInteger lc = (UnsignedInteger) flowMatcher.getReceivedLinkCredit();
-
-        return dc.add(lc);
+        expectLinkFlowRespondWithTransfer(null, null, null, null, null, 0, drain, sendDrainFlowResponse, creditMatcher, null);
     }
 
     public void expectLinkFlowRespondWithTransfer(final HeaderDescribedType headerDescribedType,
@@ -770,7 +716,8 @@ public class TestAmqpPeer implements AutoCloseable
                                                  final ApplicationPropertiesDescribedType appPropertiesDescribedType,
                                                  final DescribedType content)
     {
-        expectLinkFlowRespondWithTransfer(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType, appPropertiesDescribedType, content, 1);
+        expectLinkFlowRespondWithTransfer(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType,
+                                          appPropertiesDescribedType, content, 1);
     }
 
     public void expectLinkFlowRespondWithTransfer(final HeaderDescribedType headerDescribedType,
@@ -780,19 +727,54 @@ public class TestAmqpPeer implements AutoCloseable
                                                   final DescribedType content,
                                                   final int count)
     {
-        if(count <= 0)
+        expectLinkFlowRespondWithTransfer(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType,
+                                          appPropertiesDescribedType, content, count, false, false,
+                                          Matchers.greaterThanOrEqualTo(UnsignedInteger.valueOf(count)), 1);
+    }
+
+    public void expectLinkFlowRespondWithTransfer(final HeaderDescribedType headerDescribedType,
+            final MessageAnnotationsDescribedType messageAnnotationsDescribedType,
+            final PropertiesDescribedType propertiesDescribedType,
+            final ApplicationPropertiesDescribedType appPropertiesDescribedType,
+            final DescribedType content,
+            final int count,
+            final boolean drain,
+            final boolean sendDrainFlowResponse,
+            Matcher<UnsignedInteger> creditMatcher,
+            final Integer nextIncomingId)
+    {
+        if (nextIncomingId == null && count > 0)
+        {
+            throw new IllegalArgumentException("The remote NextIncomingId must be specified if transfers have been requested");
+        }
+
+        Matcher<Boolean> drainMatcher = null;
+        if(drain)
+        {
+            drainMatcher = equalTo(true);
+        }
+        else
         {
-            throw new IllegalArgumentException("Message count must be >= 1");
+            drainMatcher = Matchers.anyOf(equalTo(false), nullValue());
         }
 
-        int nextIncomingId = 1; // TODO: we shouldn't assume this will be the first transfer on the session
+        Matcher<UnsignedInteger> remoteNextIncomingIdMatcher = null;
+        if(nextIncomingId != null)
+        {
+             remoteNextIncomingIdMatcher = Matchers.equalTo(UnsignedInteger.valueOf(nextIncomingId));
+        }
+        else
+        {
+            remoteNextIncomingIdMatcher = Matchers.greaterThanOrEqualTo(UnsignedInteger.ONE);
+        }
 
         final FlowMatcher flowMatcher = new FlowMatcher()
                         .withLinkCredit(Matchers.greaterThanOrEqualTo(UnsignedInteger.valueOf(count)))
-                        .withDrain(Matchers.anyOf(equalTo(false), nullValue()))
-                        .withNextIncomingId(Matchers.equalTo(UnsignedInteger.valueOf(nextIncomingId)));
+                        .withDrain(drainMatcher)
+                        .withNextIncomingId(remoteNextIncomingIdMatcher);
 
         CompositeAmqpPeerRunnable composite = new CompositeAmqpPeerRunnable();
+        boolean addComposite = false;
 
         for(int i = 0; i < count; i++)
         {
@@ -822,14 +804,63 @@ public class TestAmqpPeer implements AutoCloseable
                 }
             });
 
+            addComposite = true;
             composite.add(transferResponseSender);
         }
 
-        flowMatcher.onSuccess(composite);
+        if(drain && sendDrainFlowResponse)
+        {
+            final FlowFrame drainResponse = new FlowFrame();
+            drainResponse.setOutgoingWindow(UnsignedInteger.ZERO); //TODO: shouldnt be hard coded
+            drainResponse.setIncomingWindow(UnsignedInteger.valueOf(Integer.MAX_VALUE)); //TODO: shouldnt be hard coded
+            drainResponse.setLinkCredit(UnsignedInteger.ZERO);
+            drainResponse.setDrain(true);
+
+            // The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
+            final FrameSender flowResponseSender = new FrameSender(this, FrameType.AMQP, -1, drainResponse, null);
+            flowResponseSender.setValueProvider(new ValueProvider()
+            {
+                @Override
+                public void setValues()
+                {
+                    flowResponseSender.setChannel(flowMatcher.getActualChannel());
+                    drainResponse.setHandle(calculateLinkHandle(flowMatcher));
+                    drainResponse.setDeliveryCount(calculateNewDeliveryCount(flowMatcher));
+                    drainResponse.setNextOutgoingId(calculateNewOutgoingId(flowMatcher, count));
+                    drainResponse.setNextIncomingId(flowMatcher.getReceivedNextOutgoingId());
+                }
+            });
+
+            addComposite = true;
+            composite.add(flowResponseSender);
+        }
+
+        if(addComposite) {
+            flowMatcher.onSuccess(composite);
+        }
 
         addHandler(flowMatcher);
     }
 
+    private UnsignedInteger calculateLinkHandle(final FlowMatcher flowMatcher) {
+        UnsignedInteger h = (UnsignedInteger) flowMatcher.getReceivedHandle();
+
+        return h.add(UnsignedInteger.valueOf(LINK_HANDLE_OFFSET));
+    }
+
+    private UnsignedInteger calculateNewDeliveryCount(FlowMatcher flowMatcher) {
+        UnsignedInteger dc = (UnsignedInteger) flowMatcher.getReceivedDeliveryCount();
+        UnsignedInteger lc = (UnsignedInteger) flowMatcher.getReceivedLinkCredit();
+
+        return dc.add(lc);
+    }
+
+    private UnsignedInteger calculateNewOutgoingId(FlowMatcher flowMatcher, int sentCount) {
+        UnsignedInteger nid = (UnsignedInteger) flowMatcher.getReceivedNextIncomingId();
+
+        return nid.add(UnsignedInteger.valueOf(sentCount));
+    }
+
     private Binary prepareTransferPayload(final HeaderDescribedType headerDescribedType,
                                           final MessageAnnotationsDescribedType messageAnnotationsDescribedType,
                                           final PropertiesDescribedType propertiesDescribedType,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org