You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2014/12/10 15:09:36 UTC

qpid-jms git commit: update stop process to handle presence of queued messages just in case, tidy up a little for clarity

Repository: qpid-jms
Updated Branches:
  refs/heads/master 4e0c88fab -> 99dde7ac2


update stop process to handle presence of queued messages just in case, tidy up a little for clarity


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/99dde7ac
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/99dde7ac
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/99dde7ac

Branch: refs/heads/master
Commit: 99dde7ac2ecf2295ce84ee39e172d805119a67b2
Parents: 4e0c88f
Author: Robert Gemmell <ro...@apache.org>
Authored: Wed Dec 10 11:26:42 2014 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Wed Dec 10 11:26:42 2014 +0000

----------------------------------------------------------------------
 .../qpid/jms/provider/amqp/AmqpConsumer.java    | 50 +++++++++++---------
 1 file changed, 28 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/99dde7ac/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 64cacc3..19b4560 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -78,7 +78,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
 
     private final AtomicLong _incomingSequence = new AtomicLong(0);
 
-    private AsyncResult drainRequest;
+    private AsyncResult stopRequest;
 
     public AmqpConsumer(AmqpSession session, JmsConsumerInfo info) {
         super(info);
@@ -100,30 +100,35 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
      * Stops the consumer, using all link credit and waiting for in-flight messages to arrive.
      */
     public void stop(AsyncResult request) {
-        //TODO: We dont actually want the additional messages that could be sent while
-        // draining. We could explicitly reduce credit first, or possibly use 'echo' instead
-        // of drain if it was supported. We would first need to understand what happens
-        // if we reduce credit below the number of messages already in-flight before
-        // the peer sees the update.
-
         Receiver receiver = getEndpoint();
-        if(receiver.getRemoteCredit() <= 0) {
-            // Sender already used all the credit on offer
-            request.onSuccess();
-        }
-        else{
-            drainRequest = request;
+        if (receiver.getRemoteCredit() <= 0) {
+            if (receiver.getQueued() == 0) {
+                // We have no remote credit and all the deliveries have been processed.
+                request.onSuccess();
+            } else {
+                // There are still deliveries to process, wait for them to be.
+                stopRequest = request;
+            }
+        } else {
+            //TODO: We dont actually want the additional messages that could be sent while
+            // draining. We could explicitly reduce credit first, or possibly use 'echo' instead
+            // of drain if it was supported. We would first need to understand what happens
+            // if we reduce credit below the number of messages already in-flight before
+            // the peer sees the update.
+            stopRequest = request;
             receiver.drain(0);
         }
     }
 
     @Override
     public void processFlowUpdates() throws IOException {
-        if (drainRequest != null) {
+        // Check if we tried to stop and have now run out of credit, and
+        // processed all locally queued messages
+        if (stopRequest != null) {
             Receiver receiver = getEndpoint();
-            if (receiver.getDrain() && receiver.getRemoteCredit() <= 0) {
-                drainRequest.onSuccess();
-                drainRequest = null;
+            if (receiver.getRemoteCredit() <= 0 && receiver.getQueued() == 0) {
+                stopRequest.onSuccess();
+                stopRequest = null;
             }
         }
 
@@ -360,12 +365,13 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
                     incoming = null;
                 }
             } else {
-                //We have exhausted the currently available messages on this link. Check if we tried to drain.
-                if(drainRequest != null) {
-                    if(getEndpoint().getDrain() && getEndpoint().getRemoteCredit() <= 0)
+                // We have exhausted the locally queued messages on this link.
+                // Check if we tried to stop and have now run out of credit.
+                if(stopRequest != null) {
+                    if(getEndpoint().getRemoteCredit() <= 0)
                     {
-                        drainRequest.onSuccess();
-                        drainRequest = null;
+                        stopRequest.onSuccess();
+                        stopRequest = null;
                     }
                 }
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org