You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/22 23:55:54 UTC
svn commit: r787419 -
/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
Author: chirino
Date: Mon Jun 22 21:55:53 2009
New Revision: 787419
URL: http://svn.apache.org/viewvc?rev=787419&view=rev
Log:
better delivered ack handling.
Modified:
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=787419&r1=787418&r2=787419&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Mon Jun 22 21:55:53 2009
@@ -522,6 +522,7 @@
private HashMap<MessageId, SubscriptionDeliveryCallback> pendingMessages = new HashMap<MessageId, SubscriptionDeliveryCallback>();
private LinkedList<MessageId> pendingMessageIds = new LinkedList<MessageId>();
private BrokerSubscription brokerSubscription;
+ private int borrowedLimterCredits;
public ConsumerContext(final ConsumerInfo info, ClientContext parent) throws Exception {
super(info.getConsumerId().toString(), parent);
@@ -594,14 +595,18 @@
public void ack(MessageAck info) {
// TODO: The pending message queue could probably be optimized to
// avoid having to create a new list here.
+ int flowCredit = info.getMessageCount();
if( info.isDeliveredAck() ) {
- // This ack is just trying to expand the flow control window size.
- limiter.onProtocolCredit(info.getMessageCount());
+ // This ack is just trying to expand the flow control window size without actually
+ // acking the message. Keep track of how many limiter credits we borrow since they need
+ // to get paid back with real acks later.
+ borrowedLimterCredits += flowCredit;
+ limiter.onProtocolCredit(flowCredit);
} else if(info.isStandardAck()) {
LinkedList<SubscriptionDeliveryCallback> acked = new LinkedList<SubscriptionDeliveryCallback>();
synchronized (this) {
MessageId id = info.getLastMessageId();
- if (isDurable() || isQueueReceiver())
+ if (isDurable() || isQueueReceiver()) {
while (!pendingMessageIds.isEmpty()) {
MessageId pendingId = pendingMessageIds.getFirst();
SubscriptionDeliveryCallback callback = pendingMessages.remove(pendingId);
@@ -611,7 +616,22 @@
break;
}
}
- limiter.onProtocolCredit(info.getMessageCount());
+ }
+
+ // Did we have DeliveredAcks previously sent? Then the
+ // the flow window has already been credited. We need to
+ // pay back the borrowed limiter credits before giving
+ // credits directly to the limiter.
+ if(borrowedLimterCredits>0) {
+ if( flowCredit > borrowedLimterCredits ) {
+ flowCredit -= borrowedLimterCredits;
+ borrowedLimterCredits=0;
+ } else {
+ borrowedLimterCredits -= flowCredit;
+ flowCredit=0;
+ }
+ }
+ limiter.onProtocolCredit(flowCredit);
}
// Delete outside of synchronization on queue to avoid contention