You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2014/12/09 17:03:10 UTC
[7/9] qpid-jms git commit: consolidate handling of consumer flow
handling and subsequent transfer and drain flow responses
consolidate handling of consumer flow handling and subsequent transfer and drain flow responses
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/b5e981c4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/b5e981c4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/b5e981c4
Branch: refs/heads/master
Commit: b5e981c49d11cc925a4d55ffe5b5d5bf8c00e5d2
Parents: 1b75b7e
Author: Robert Gemmell <ro...@apache.org>
Authored: Tue Dec 9 14:40:03 2014 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Tue Dec 9 14:40:03 2014 +0000
----------------------------------------------------------------------
.../qpid/jms/test/testpeer/TestAmqpPeer.java | 155 +++++++++++--------
1 file changed, 93 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b5e981c4/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 0152cbf..cca5ab5 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -707,61 +707,7 @@ public class TestAmqpPeer implements AutoCloseable
public void expectLinkFlow(boolean drain, boolean sendDrainFlowResponse, Matcher<UnsignedInteger> creditMatcher)
{
- Matcher<Boolean> drainMatcher = null;
- if(drain)
- {
- drainMatcher = equalTo(true);
- }
- else
- {
- drainMatcher = Matchers.anyOf(equalTo(false), nullValue());
- }
-
- final FlowMatcher flowMatcher = new FlowMatcher()
- .withLinkCredit(creditMatcher)
- .withHandle(Matchers.notNullValue())
- .withDrain(drainMatcher);
-
- if(drain && sendDrainFlowResponse)
- {
- final FlowFrame drainResponse = new FlowFrame();
- drainResponse.setOutgoingWindow(UnsignedInteger.ZERO); //TODO: shouldnt be hard coded
- drainResponse.setIncomingWindow(UnsignedInteger.valueOf(Integer.MAX_VALUE)); //TODO: shouldnt be hard coded
- drainResponse.setLinkCredit(UnsignedInteger.ZERO);
- drainResponse.setDrain(true);
-
- // The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
- final FrameSender flowResponseSender = new FrameSender(this, FrameType.AMQP, -1, drainResponse, null);
- flowResponseSender.setValueProvider(new ValueProvider()
- {
- @Override
- public void setValues()
- {
- flowResponseSender.setChannel(flowMatcher.getActualChannel());
- drainResponse.setHandle(calculateLinkHandle(flowMatcher));
- drainResponse.setDeliveryCount(calculateNewDeliveryCount(flowMatcher));
- drainResponse.setNextOutgoingId(flowMatcher.getReceivedNextIncomingId()); // Assuming no 'in-flight' messages.
- drainResponse.setNextIncomingId(flowMatcher.getReceivedNextOutgoingId());
- }
- });
-
- flowMatcher.onSuccess(flowResponseSender);
- }
-
- addHandler(flowMatcher);
- }
-
- private UnsignedInteger calculateLinkHandle(final FlowMatcher flowMatcher) {
- UnsignedInteger h = (UnsignedInteger) flowMatcher.getReceivedHandle();
-
- return h.add(UnsignedInteger.valueOf(LINK_HANDLE_OFFSET));
- }
-
- private UnsignedInteger calculateNewDeliveryCount(FlowMatcher flowMatcher) {
- UnsignedInteger dc = (UnsignedInteger) flowMatcher.getReceivedDeliveryCount();
- UnsignedInteger lc = (UnsignedInteger) flowMatcher.getReceivedLinkCredit();
-
- return dc.add(lc);
+ expectLinkFlowRespondWithTransfer(null, null, null, null, null, 0, drain, sendDrainFlowResponse, creditMatcher, null);
}
public void expectLinkFlowRespondWithTransfer(final HeaderDescribedType headerDescribedType,
@@ -770,7 +716,8 @@ public class TestAmqpPeer implements AutoCloseable
final ApplicationPropertiesDescribedType appPropertiesDescribedType,
final DescribedType content)
{
- expectLinkFlowRespondWithTransfer(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType, appPropertiesDescribedType, content, 1);
+ expectLinkFlowRespondWithTransfer(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType,
+ appPropertiesDescribedType, content, 1);
}
public void expectLinkFlowRespondWithTransfer(final HeaderDescribedType headerDescribedType,
@@ -780,19 +727,54 @@ public class TestAmqpPeer implements AutoCloseable
final DescribedType content,
final int count)
{
- if(count <= 0)
+ expectLinkFlowRespondWithTransfer(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType,
+ appPropertiesDescribedType, content, count, false, false,
+ Matchers.greaterThanOrEqualTo(UnsignedInteger.valueOf(count)), 1);
+ }
+
+ public void expectLinkFlowRespondWithTransfer(final HeaderDescribedType headerDescribedType,
+ final MessageAnnotationsDescribedType messageAnnotationsDescribedType,
+ final PropertiesDescribedType propertiesDescribedType,
+ final ApplicationPropertiesDescribedType appPropertiesDescribedType,
+ final DescribedType content,
+ final int count,
+ final boolean drain,
+ final boolean sendDrainFlowResponse,
+ Matcher<UnsignedInteger> creditMatcher,
+ final Integer nextIncomingId)
+ {
+ if (nextIncomingId == null && count > 0)
+ {
+ throw new IllegalArgumentException("The remote NextIncomingId must be specified if transfers have been requested");
+ }
+
+ Matcher<Boolean> drainMatcher = null;
+ if(drain)
+ {
+ drainMatcher = equalTo(true);
+ }
+ else
{
- throw new IllegalArgumentException("Message count must be >= 1");
+ drainMatcher = Matchers.anyOf(equalTo(false), nullValue());
}
- int nextIncomingId = 1; // TODO: we shouldn't assume this will be the first transfer on the session
+ Matcher<UnsignedInteger> remoteNextIncomingIdMatcher = null;
+ if(nextIncomingId != null)
+ {
+ remoteNextIncomingIdMatcher = Matchers.equalTo(UnsignedInteger.valueOf(nextIncomingId));
+ }
+ else
+ {
+ remoteNextIncomingIdMatcher = Matchers.greaterThanOrEqualTo(UnsignedInteger.ONE);
+ }
final FlowMatcher flowMatcher = new FlowMatcher()
.withLinkCredit(Matchers.greaterThanOrEqualTo(UnsignedInteger.valueOf(count)))
- .withDrain(Matchers.anyOf(equalTo(false), nullValue()))
- .withNextIncomingId(Matchers.equalTo(UnsignedInteger.valueOf(nextIncomingId)));
+ .withDrain(drainMatcher)
+ .withNextIncomingId(remoteNextIncomingIdMatcher);
CompositeAmqpPeerRunnable composite = new CompositeAmqpPeerRunnable();
+ boolean addComposite = false;
for(int i = 0; i < count; i++)
{
@@ -822,14 +804,63 @@ public class TestAmqpPeer implements AutoCloseable
}
});
+ addComposite = true;
composite.add(transferResponseSender);
}
- flowMatcher.onSuccess(composite);
+ if(drain && sendDrainFlowResponse)
+ {
+ final FlowFrame drainResponse = new FlowFrame();
+ drainResponse.setOutgoingWindow(UnsignedInteger.ZERO); //TODO: shouldnt be hard coded
+ drainResponse.setIncomingWindow(UnsignedInteger.valueOf(Integer.MAX_VALUE)); //TODO: shouldnt be hard coded
+ drainResponse.setLinkCredit(UnsignedInteger.ZERO);
+ drainResponse.setDrain(true);
+
+ // The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
+ final FrameSender flowResponseSender = new FrameSender(this, FrameType.AMQP, -1, drainResponse, null);
+ flowResponseSender.setValueProvider(new ValueProvider()
+ {
+ @Override
+ public void setValues()
+ {
+ flowResponseSender.setChannel(flowMatcher.getActualChannel());
+ drainResponse.setHandle(calculateLinkHandle(flowMatcher));
+ drainResponse.setDeliveryCount(calculateNewDeliveryCount(flowMatcher));
+ drainResponse.setNextOutgoingId(calculateNewOutgoingId(flowMatcher, count));
+ drainResponse.setNextIncomingId(flowMatcher.getReceivedNextOutgoingId());
+ }
+ });
+
+ addComposite = true;
+ composite.add(flowResponseSender);
+ }
+
+ if(addComposite) {
+ flowMatcher.onSuccess(composite);
+ }
addHandler(flowMatcher);
}
+ private UnsignedInteger calculateLinkHandle(final FlowMatcher flowMatcher) {
+ UnsignedInteger h = (UnsignedInteger) flowMatcher.getReceivedHandle();
+
+ return h.add(UnsignedInteger.valueOf(LINK_HANDLE_OFFSET));
+ }
+
+ private UnsignedInteger calculateNewDeliveryCount(FlowMatcher flowMatcher) {
+ UnsignedInteger dc = (UnsignedInteger) flowMatcher.getReceivedDeliveryCount();
+ UnsignedInteger lc = (UnsignedInteger) flowMatcher.getReceivedLinkCredit();
+
+ return dc.add(lc);
+ }
+
+ private UnsignedInteger calculateNewOutgoingId(FlowMatcher flowMatcher, int sentCount) {
+ UnsignedInteger nid = (UnsignedInteger) flowMatcher.getReceivedNextIncomingId();
+
+ return nid.add(UnsignedInteger.valueOf(sentCount));
+ }
+
private Binary prepareTransferPayload(final HeaderDescribedType headerDescribedType,
final MessageAnnotationsDescribedType messageAnnotationsDescribedType,
final PropertiesDescribedType propertiesDescribedType,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org