You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2015/08/27 15:35:43 UTC
qpid-jms git commit: QPIDJMS-92: some tweaks and comments for clarity
Repository: qpid-jms
Updated Branches:
refs/heads/master cf492a102 -> b42f12ea2
QPIDJMS-92: some tweaks and comments for clarity
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/b42f12ea
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/b42f12ea
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/b42f12ea
Branch: refs/heads/master
Commit: b42f12ea2a8005286fe92181d47616507cd2851a
Parents: cf492a1
Author: Robert Gemmell <ro...@apache.org>
Authored: Thu Aug 27 14:34:49 2015 +0100
Committer: Robert Gemmell <ro...@apache.org>
Committed: Thu Aug 27 14:34:49 2015 +0100
----------------------------------------------------------------------
.../qpid/jms/provider/amqp/AmqpConsumer.java | 20 ++++++++++++--------
1 file changed, 12 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b42f12ea/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 91c3233..ab8c2f3 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -97,7 +97,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
private final AtomicLong incomingSequence = new AtomicLong(0);
private AsyncResult stopRequest;
- private AsyncResult pullRequest;
+ private PullRequest pullRequest;
public AmqpConsumer(AmqpSession session, JmsConsumerInfo info) {
super(info);
@@ -151,6 +151,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
}
}
+ // Check if we tried to pull a message and whether we got it
if (pullRequest != null) {
Receiver receiver = getEndpoint();
if (receiver.getRemoteCredit() <= 0) {
@@ -412,11 +413,12 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
if (timeout < 0) {
getEndpoint().flow(1);
} else if (timeout == 0) {
- pullRequest = new DrainingPullRequest();
+ pullRequest = new PullRequest();
getEndpoint().drain(1);
} else if (timeout > 0) {
- // We need to turn off the credit and signal the consumer
- // that there was no message.
+ // We need to drain the credit if no message arrives. If that
+ // happens, processing completion of the drain attempt will signal
+ // the consumer and clear the pullRequest.
final ScheduledFuture<?> future = getSession().schedule(new Runnable() {
@Override
@@ -459,7 +461,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
}
} else {
// We have exhausted the locally queued messages on this link.
- // Check if we tried to stop and have now run out of credit.
+ // Check if we tried to stop/pull and have now run out of credit.
if (getEndpoint().getRemoteCredit() <= 0) {
if (stopRequest != null) {
stopRequest.onSuccess();
@@ -467,6 +469,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
}
if (pullRequest != null) {
+ // Failure as we didn't get the desired message
pullRequest.onFailure(null);
pullRequest = null;
}
@@ -587,7 +590,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
if (envelope.getMessage() != null) {
LOG.debug("Dispatching received message: {}", envelope);
} else {
- LOG.debug("Dispatching end of browse to: {}", envelope.getConsumerId());
+ LOG.debug("Dispatching end of pull/browse to: {}", envelope.getConsumerId());
}
listener.onInboundMessage(envelope);
} else {
@@ -633,12 +636,13 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
public void postRollback() throws Exception {
}
- private class DrainingPullRequest implements AsyncResult {
+ private class PullRequest implements AsyncResult {
@Override
public void onFailure(Throwable result) {
JmsInboundMessageDispatch pullDone = new JmsInboundMessageDispatch(getNextIncomingSequenceNumber());
pullDone.setConsumerId(getConsumerId());
+ // Lack of setMessage on the dispatch is taken as signal no message arrived.
try {
deliver(pullDone);
} catch (Exception e) {
@@ -657,7 +661,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
}
}
- private class TimedPullRequest extends DrainingPullRequest {
+ private class TimedPullRequest extends PullRequest {
private final ScheduledFuture<?> completionTask;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org