You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2014/12/05 15:49:32 UTC
[2/3] qpid-jms git commit: add support for using drain to stop links
add support for using drain to stop links
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/0fdc8e54
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/0fdc8e54
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/0fdc8e54
Branch: refs/heads/master
Commit: 0fdc8e549b344e894e9ff5b54e9d09b31560afb3
Parents: f994b35
Author: Robert Gemmell <ro...@apache.org>
Authored: Fri Dec 5 12:46:05 2014 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Fri Dec 5 14:49:05 2014 +0000
----------------------------------------------------------------------
.../qpid/jms/test/testpeer/TestAmqpPeer.java | 61 +++++++++++++++++++-
1 file changed, 59 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0fdc8e54/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index e252345..fc9ec43 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -82,6 +82,8 @@ import org.slf4j.LoggerFactory;
// TODO should expectXXXYYYZZZ methods just be expect(matcher)?
public class TestAmqpPeer implements AutoCloseable
{
+ private static final int LINK_HANDLE_OFFSET = 100;
+
private static final Logger LOGGER = LoggerFactory.getLogger(TestAmqpPeer.class.getName());
private final TestAmqpPeerRunner _driverRunnable;
@@ -98,7 +100,7 @@ public class TestAmqpPeer implements AutoCloseable
*/
private CountDownLatch _handlersCompletedLatch;
- private volatile int _nextLinkHandle = 100;
+ private volatile int _nextLinkHandle = LINK_HANDLE_OFFSET;
private byte[] _deferredBytes;
@@ -700,13 +702,68 @@ public class TestAmqpPeer implements AutoCloseable
public void expectLinkFlow()
{
+ expectLinkFlow(false);
+ }
+
+ public void expectLinkFlow(boolean drain)
+ {
+ Matcher<Boolean> drainMatcher = null;
+ if(drain)
+ {
+ drainMatcher = equalTo(true);
+ }
+ else
+ {
+ drainMatcher = Matchers.anyOf(equalTo(false), nullValue());
+ }
+
final FlowMatcher flowMatcher = new FlowMatcher()
.withLinkCredit(Matchers.greaterThan(UnsignedInteger.ZERO))
- .withHandle(Matchers.notNullValue());
+ .withHandle(Matchers.notNullValue())
+ .withDrain(drainMatcher);
+
+ if(drain)
+ {
+ final FlowFrame drainResponse = new FlowFrame();
+ drainResponse.setOutgoingWindow(UnsignedInteger.ZERO); //TODO: shouldnt be hard coded
+ drainResponse.setIncomingWindow(UnsignedInteger.valueOf(Integer.MAX_VALUE)); //TODO: shouldnt be hard coded
+ drainResponse.setLinkCredit(UnsignedInteger.ZERO);
+ drainResponse.setDrain(true);
+
+ // The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
+ final FrameSender flowResponseSender = new FrameSender(this, FrameType.AMQP, -1, drainResponse, null);
+ flowResponseSender.setValueProvider(new ValueProvider()
+ {
+ @Override
+ public void setValues()
+ {
+ flowResponseSender.setChannel(flowMatcher.getActualChannel());
+ drainResponse.setHandle(calculateLinkHandle(flowMatcher));
+ drainResponse.setDeliveryCount(calculateNewDeliveryCount(flowMatcher));
+ drainResponse.setNextOutgoingId(flowMatcher.getReceivedNextIncomingId()); // Assuming no 'in-flight' messages.
+ drainResponse.setNextIncomingId(flowMatcher.getReceivedNextOutgoingId());
+ }
+ });
+
+ flowMatcher.onSuccess(flowResponseSender);
+ }
addHandler(flowMatcher);
}
+ private UnsignedInteger calculateLinkHandle(final FlowMatcher flowMatcher) {
+ UnsignedInteger h = (UnsignedInteger) flowMatcher.getReceivedHandle();
+
+ return h.add(UnsignedInteger.valueOf(LINK_HANDLE_OFFSET));
+ }
+
+ private UnsignedInteger calculateNewDeliveryCount(FlowMatcher flowMatcher) {
+ UnsignedInteger dc = (UnsignedInteger) flowMatcher.getReceivedDeliveryCount();
+ UnsignedInteger lc = (UnsignedInteger) flowMatcher.getReceivedLinkCredit();
+
+ return dc.add(lc);
+ }
+
public void expectLinkFlowRespondWithTransfer(final HeaderDescribedType headerDescribedType,
final MessageAnnotationsDescribedType messageAnnotationsDescribedType,
final PropertiesDescribedType propertiesDescribedType,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org