You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2014/12/10 15:09:36 UTC
qpid-jms git commit: update stop process to handle presence of queued
messages just in case, tidy up a little for clarity
Repository: qpid-jms
Updated Branches:
refs/heads/master 4e0c88fab -> 99dde7ac2
update stop process to handle presence of queued messages just in case, tidy up a little for clarity
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/99dde7ac
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/99dde7ac
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/99dde7ac
Branch: refs/heads/master
Commit: 99dde7ac2ecf2295ce84ee39e172d805119a67b2
Parents: 4e0c88f
Author: Robert Gemmell <ro...@apache.org>
Authored: Wed Dec 10 11:26:42 2014 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Wed Dec 10 11:26:42 2014 +0000
----------------------------------------------------------------------
.../qpid/jms/provider/amqp/AmqpConsumer.java | 50 +++++++++++---------
1 file changed, 28 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/99dde7ac/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 64cacc3..19b4560 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -78,7 +78,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
private final AtomicLong _incomingSequence = new AtomicLong(0);
- private AsyncResult drainRequest;
+ private AsyncResult stopRequest;
public AmqpConsumer(AmqpSession session, JmsConsumerInfo info) {
super(info);
@@ -100,30 +100,35 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
* Stops the consumer, using all link credit and waiting for in-flight messages to arrive.
*/
public void stop(AsyncResult request) {
- //TODO: We dont actually want the additional messages that could be sent while
- // draining. We could explicitly reduce credit first, or possibly use 'echo' instead
- // of drain if it was supported. We would first need to understand what happens
- // if we reduce credit below the number of messages already in-flight before
- // the peer sees the update.
-
Receiver receiver = getEndpoint();
- if(receiver.getRemoteCredit() <= 0) {
- // Sender already used all the credit on offer
- request.onSuccess();
- }
- else{
- drainRequest = request;
+ if (receiver.getRemoteCredit() <= 0) {
+ if (receiver.getQueued() == 0) {
+ // We have no remote credit and all the deliveries have been processed.
+ request.onSuccess();
+ } else {
+ // There are still deliveries to process, wait for them to be.
+ stopRequest = request;
+ }
+ } else {
+ //TODO: We dont actually want the additional messages that could be sent while
+ // draining. We could explicitly reduce credit first, or possibly use 'echo' instead
+ // of drain if it was supported. We would first need to understand what happens
+ // if we reduce credit below the number of messages already in-flight before
+ // the peer sees the update.
+ stopRequest = request;
receiver.drain(0);
}
}
@Override
public void processFlowUpdates() throws IOException {
- if (drainRequest != null) {
+ // Check if we tried to stop and have now run out of credit, and
+ // processed all locally queued messages
+ if (stopRequest != null) {
Receiver receiver = getEndpoint();
- if (receiver.getDrain() && receiver.getRemoteCredit() <= 0) {
- drainRequest.onSuccess();
- drainRequest = null;
+ if (receiver.getRemoteCredit() <= 0 && receiver.getQueued() == 0) {
+ stopRequest.onSuccess();
+ stopRequest = null;
}
}
@@ -360,12 +365,13 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
incoming = null;
}
} else {
- //We have exhausted the currently available messages on this link. Check if we tried to drain.
- if(drainRequest != null) {
- if(getEndpoint().getDrain() && getEndpoint().getRemoteCredit() <= 0)
+ // We have exhausted the locally queued messages on this link.
+ // Check if we tried to stop and have now run out of credit.
+ if(stopRequest != null) {
+ if(getEndpoint().getRemoteCredit() <= 0)
{
- drainRequest.onSuccess();
- drainRequest = null;
+ stopRequest.onSuccess();
+ stopRequest = null;
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org