You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2014/12/05 15:49:32 UTC

[2/3] qpid-jms git commit: add support for using drain to stop links

add support for using drain to stop links


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/0fdc8e54
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/0fdc8e54
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/0fdc8e54

Branch: refs/heads/master
Commit: 0fdc8e549b344e894e9ff5b54e9d09b31560afb3
Parents: f994b35
Author: Robert Gemmell <ro...@apache.org>
Authored: Fri Dec 5 12:46:05 2014 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Fri Dec 5 14:49:05 2014 +0000

----------------------------------------------------------------------
 .../qpid/jms/test/testpeer/TestAmqpPeer.java    | 61 +++++++++++++++++++-
 1 file changed, 59 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0fdc8e54/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index e252345..fc9ec43 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -82,6 +82,8 @@ import org.slf4j.LoggerFactory;
 // TODO should expectXXXYYYZZZ methods just be expect(matcher)?
 public class TestAmqpPeer implements AutoCloseable
 {
+    private static final int LINK_HANDLE_OFFSET = 100;
+
     private static final Logger LOGGER = LoggerFactory.getLogger(TestAmqpPeer.class.getName());
 
     private final TestAmqpPeerRunner _driverRunnable;
@@ -98,7 +100,7 @@ public class TestAmqpPeer implements AutoCloseable
      */
     private CountDownLatch _handlersCompletedLatch;
 
-    private volatile int _nextLinkHandle = 100;
+    private volatile int _nextLinkHandle = LINK_HANDLE_OFFSET;
 
     private byte[] _deferredBytes;
 
@@ -700,13 +702,68 @@ public class TestAmqpPeer implements AutoCloseable
 
     public void expectLinkFlow()
     {
+        expectLinkFlow(false);
+    }
+
+    public void expectLinkFlow(boolean drain)
+    {
+        Matcher<Boolean> drainMatcher = null;
+        if(drain)
+        {
+            drainMatcher = equalTo(true);
+        }
+        else
+        {
+            drainMatcher = Matchers.anyOf(equalTo(false), nullValue());
+        }
+
         final FlowMatcher flowMatcher = new FlowMatcher()
                         .withLinkCredit(Matchers.greaterThan(UnsignedInteger.ZERO))
-                        .withHandle(Matchers.notNullValue());
+                        .withHandle(Matchers.notNullValue())
+                        .withDrain(drainMatcher);
+
+        if(drain)
+        {
+            final FlowFrame drainResponse = new FlowFrame();
+            drainResponse.setOutgoingWindow(UnsignedInteger.ZERO); //TODO: shouldnt be hard coded
+            drainResponse.setIncomingWindow(UnsignedInteger.valueOf(Integer.MAX_VALUE)); //TODO: shouldnt be hard coded
+            drainResponse.setLinkCredit(UnsignedInteger.ZERO);
+            drainResponse.setDrain(true);
+
+            // The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
+            final FrameSender flowResponseSender = new FrameSender(this, FrameType.AMQP, -1, drainResponse, null);
+            flowResponseSender.setValueProvider(new ValueProvider()
+            {
+                @Override
+                public void setValues()
+                {
+                    flowResponseSender.setChannel(flowMatcher.getActualChannel());
+                    drainResponse.setHandle(calculateLinkHandle(flowMatcher));
+                    drainResponse.setDeliveryCount(calculateNewDeliveryCount(flowMatcher));
+                    drainResponse.setNextOutgoingId(flowMatcher.getReceivedNextIncomingId()); // Assuming no 'in-flight' messages.
+                    drainResponse.setNextIncomingId(flowMatcher.getReceivedNextOutgoingId());
+                }
+            });
+
+            flowMatcher.onSuccess(flowResponseSender);
+        }
 
         addHandler(flowMatcher);
     }
 
+    private UnsignedInteger calculateLinkHandle(final FlowMatcher flowMatcher) {
+        UnsignedInteger h = (UnsignedInteger) flowMatcher.getReceivedHandle();
+
+        return h.add(UnsignedInteger.valueOf(LINK_HANDLE_OFFSET));
+    }
+
+    private UnsignedInteger calculateNewDeliveryCount(FlowMatcher flowMatcher) {
+        UnsignedInteger dc = (UnsignedInteger) flowMatcher.getReceivedDeliveryCount();
+        UnsignedInteger lc = (UnsignedInteger) flowMatcher.getReceivedLinkCredit();
+
+        return dc.add(lc);
+    }
+
     public void expectLinkFlowRespondWithTransfer(final HeaderDescribedType headerDescribedType,
                                                  final MessageAnnotationsDescribedType messageAnnotationsDescribedType,
                                                  final PropertiesDescribedType propertiesDescribedType,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org