You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/22 23:55:54 UTC

svn commit: r787419 - /activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java

Author: chirino
Date: Mon Jun 22 21:55:53 2009
New Revision: 787419

URL: http://svn.apache.org/viewvc?rev=787419&view=rev
Log:
better delivered ack handling.

Modified:
    activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=787419&r1=787418&r2=787419&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Mon Jun 22 21:55:53 2009
@@ -522,6 +522,7 @@
         private HashMap<MessageId, SubscriptionDeliveryCallback> pendingMessages = new HashMap<MessageId, SubscriptionDeliveryCallback>();
         private LinkedList<MessageId> pendingMessageIds = new LinkedList<MessageId>();
         private BrokerSubscription brokerSubscription;
+        private int borrowedLimterCredits;
 
         public ConsumerContext(final ConsumerInfo info, ClientContext parent) throws Exception {
             super(info.getConsumerId().toString(), parent);
@@ -594,14 +595,18 @@
         public void ack(MessageAck info) {
             // TODO: The pending message queue could probably be optimized to
             // avoid having to create a new list here.
+            int flowCredit = info.getMessageCount();
             if( info.isDeliveredAck() ) {
-                // This ack is just trying to expand the flow control window size.
-                limiter.onProtocolCredit(info.getMessageCount());
+                // This ack is just trying to expand the flow control window size without actually 
+                // acking the message.  Keep track of how many limiter credits we borrow since they need
+                // to get paid back with real acks later.
+                borrowedLimterCredits += flowCredit;
+                limiter.onProtocolCredit(flowCredit);
             } else if(info.isStandardAck()) {
                 LinkedList<SubscriptionDeliveryCallback> acked = new LinkedList<SubscriptionDeliveryCallback>();
                 synchronized (this) {
                     MessageId id = info.getLastMessageId();
-                    if (isDurable() || isQueueReceiver())
+                    if (isDurable() || isQueueReceiver()) {
                         while (!pendingMessageIds.isEmpty()) {
                             MessageId pendingId = pendingMessageIds.getFirst();
                             SubscriptionDeliveryCallback callback = pendingMessages.remove(pendingId);
@@ -611,7 +616,22 @@
                                 break;
                             }
                         }
-                    limiter.onProtocolCredit(info.getMessageCount());
+                    }
+                    
+                    // Did we have DeliveredAcks previously sent?  Then the 
+                    // the flow window has already been credited.  We need to 
+                    // pay back the borrowed limiter credits before giving 
+                    // credits directly to the limiter.
+                    if(borrowedLimterCredits>0) {
+                        if( flowCredit > borrowedLimterCredits ) {
+                            flowCredit -= borrowedLimterCredits;
+                            borrowedLimterCredits=0;
+                        } else {
+                            borrowedLimterCredits -= flowCredit;
+                            flowCredit=0;
+                        }
+                    }
+                    limiter.onProtocolCredit(flowCredit);
                 }
 
                 // Delete outside of synchronization on queue to avoid contention