You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2015/08/27 15:35:43 UTC

qpid-jms git commit: QPIDJMS-92: some tweaks and comments for clarity

Repository: qpid-jms
Updated Branches:
  refs/heads/master cf492a102 -> b42f12ea2


QPIDJMS-92: some tweaks and comments for clarity


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/b42f12ea
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/b42f12ea
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/b42f12ea

Branch: refs/heads/master
Commit: b42f12ea2a8005286fe92181d47616507cd2851a
Parents: cf492a1
Author: Robert Gemmell <ro...@apache.org>
Authored: Thu Aug 27 14:34:49 2015 +0100
Committer: Robert Gemmell <ro...@apache.org>
Committed: Thu Aug 27 14:34:49 2015 +0100

----------------------------------------------------------------------
 .../qpid/jms/provider/amqp/AmqpConsumer.java    | 20 ++++++++++++--------
 1 file changed, 12 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b42f12ea/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 91c3233..ab8c2f3 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -97,7 +97,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
     private final AtomicLong incomingSequence = new AtomicLong(0);
 
     private AsyncResult stopRequest;
-    private AsyncResult pullRequest;
+    private PullRequest pullRequest;
 
     public AmqpConsumer(AmqpSession session, JmsConsumerInfo info) {
         super(info);
@@ -151,6 +151,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
             }
         }
 
+        // Check if we tried to pull a message and whether we got it
         if (pullRequest != null) {
             Receiver receiver = getEndpoint();
             if (receiver.getRemoteCredit() <= 0) {
@@ -412,11 +413,12 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
             if (timeout < 0) {
                 getEndpoint().flow(1);
             } else if (timeout == 0) {
-                pullRequest = new DrainingPullRequest();
+                pullRequest = new PullRequest();
                 getEndpoint().drain(1);
             } else if (timeout > 0) {
-                // We need to turn off the credit and signal the consumer
-                // that there was no message.
+                // We need to drain the credit if no message arrives. If that
+                // happens, processing completion of the drain attempt will signal
+                // the consumer and clear the pullRequest.
                 final ScheduledFuture<?> future = getSession().schedule(new Runnable() {
 
                     @Override
@@ -459,7 +461,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
                 }
             } else {
                 // We have exhausted the locally queued messages on this link.
-                // Check if we tried to stop and have now run out of credit.
+                // Check if we tried to stop/pull and have now run out of credit.
                 if (getEndpoint().getRemoteCredit() <= 0) {
                     if (stopRequest != null) {
                         stopRequest.onSuccess();
@@ -467,6 +469,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
                     }
 
                     if (pullRequest != null) {
+                        // Failure as we didn't get the desired message
                         pullRequest.onFailure(null);
                         pullRequest = null;
                     }
@@ -587,7 +590,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
             if (envelope.getMessage() != null) {
                 LOG.debug("Dispatching received message: {}", envelope);
             } else {
-                LOG.debug("Dispatching end of browse to: {}", envelope.getConsumerId());
+                LOG.debug("Dispatching end of pull/browse to: {}", envelope.getConsumerId());
             }
             listener.onInboundMessage(envelope);
         } else {
@@ -633,12 +636,13 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
     public void postRollback() throws Exception {
     }
 
-    private class DrainingPullRequest implements AsyncResult {
+    private class PullRequest implements AsyncResult {
 
         @Override
         public void onFailure(Throwable result) {
             JmsInboundMessageDispatch pullDone = new JmsInboundMessageDispatch(getNextIncomingSequenceNumber());
             pullDone.setConsumerId(getConsumerId());
+            // Lack of setMessage on the dispatch is taken as signal no message arrived.
             try {
                 deliver(pullDone);
             } catch (Exception e) {
@@ -657,7 +661,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
         }
     }
 
-    private class TimedPullRequest extends DrainingPullRequest {
+    private class TimedPullRequest extends PullRequest {
 
         private final ScheduledFuture<?> completionTask;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org