You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/22 23:34:43 UTC

svn commit: r787412 - in /activemq/sandbox/activemq-flow: activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java

Author: chirino
Date: Mon Jun 22 21:34:42 2009
New Revision: 787412

URL: http://svn.apache.org/viewvc?rev=787412&view=rev
Log:
Fixes for the testQueueAckRemovesMessage test.



Modified:
    activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
    activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=787412&r1=787411&r2=787412&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Mon Jun 22 21:34:42 2009
@@ -594,8 +594,10 @@
         public void ack(MessageAck info) {
             // TODO: The pending message queue could probably be optimized to
             // avoid having to create a new list here.
-            //if(info.isStandardAck())
-            {
+            if( info.isDeliveredAck() ) {
+                // This ack is just trying to expand the flow control window size.
+                limiter.onProtocolCredit(info.getMessageCount());
+            } else if(info.isStandardAck()) {
                 LinkedList<SubscriptionDeliveryCallback> acked = new LinkedList<SubscriptionDeliveryCallback>();
                 synchronized (this) {
                     MessageId id = info.getLastMessageId();

Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java?rev=787412&r1=787411&r2=787412&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java Mon Jun 22 21:34:42 2009
@@ -699,7 +699,12 @@
 
             // If the sub doesn't remove on dispatch pass it the callback
             SubscriptionDeliveryCallback callback = sub.isRemoveOnDispatch(qe.elem) ? null : qe;
-
+            // If the sub is a browser don't pass it a callback since it does not need to 
+            // delete messages
+            if( sub.isBrowser() ) { 
+                callback = null;
+            }
+            
             // See if the sink has room:
             qe.setAcquired(sub);
             if (sub.offer(qe.elem, this, callback)) {