You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2008/08/12 11:36:09 UTC

svn commit: r685104 - in /incubator/qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/ack/ broker/src/main/java/org/apache/qpid/server/queue/ broker/src/main/java/org/apache/qpid/server/txn/ systests/src/main/java/org/apache/qpid/server...

Author: ritchiem
Date: Tue Aug 12 02:36:08 2008
New Revision: 685104

URL: http://svn.apache.org/viewvc?rev=685104&view=rev
Log:
QPID-1136 : Fixed Flow Control problem due to this change and added test to validate that Flow Control is operating correctly

Modified:
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java?rev=685104&r1=685103&r2=685104&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java Tue Aug 12 02:36:08 2008
@@ -116,7 +116,6 @@
         //make persistent changes, i.e. dequeue and decrementReference
         for (QueueEntry msg : _unacked.values())
         {
-            msg.restoreCredit();
             //Message has been ack so discard it. This will dequeue and decrement the reference.
             msg.discard(storeContext);
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java?rev=685104&r1=685103&r2=685104&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java Tue Aug 12 02:36:08 2008
@@ -94,7 +94,7 @@
             if(message != null)
             {
                 _unackedSize -= message.getMessage().getSize();
-                message.restoreCredit();
+
             }
 
             return message;
@@ -185,8 +185,6 @@
 
                 _unackedSize -= unacked.getValue().getMessage().getSize();
 
-                unacked.getValue().restoreCredit();
-
 
                 if (unacked.getKey() == deliveryTag)
                 {

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=685104&r1=685103&r2=685104&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Tue Aug 12 02:36:08 2008
@@ -175,8 +175,6 @@
 
     void dispose(final StoreContext storeContext) throws MessageCleanupException;
 
-    void restoreCredit();
-
     void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException;
 
     boolean isQueueDeleted();

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=685104&r1=685103&r2=685104&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Tue Aug 12 02:36:08 2008
@@ -256,6 +256,12 @@
 
         if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE))
         {
+            if (state instanceof SubscriptionAcquiredState)
+            {
+                Subscription s = ((SubscriptionAcquiredState) state).getSubscription();
+                s.restoreCredit(this);
+            }
+
             getQueue().dequeue(storeContext, this);
             if(_stateChangeListeners != null)
             {
@@ -282,16 +288,6 @@
         }
     }
 
-    public void restoreCredit()
-    {
-        EntryState state = _state;
-        if(state instanceof SubscriptionAcquiredState)
-        {
-            Subscription s = ((SubscriptionAcquiredState) _state).getSubscription();
-            s.restoreCredit(this);
-        }
-    }
-
     public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException
     {
         //if the queue is null then the message is waiting to be acked, but has been removed.

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?rev=685104&r1=685103&r2=685104&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java Tue Aug 12 02:36:08 2008
@@ -134,7 +134,6 @@
                         {
                             beginTranIfNecessary();
                         }
-                        message.restoreCredit();
                         //Message has been ack so discard it. This will dequeue and decrement the reference.
                         message.discard(_storeContext);
 

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java?rev=685104&r1=685103&r2=685104&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java Tue Aug 12 02:36:08 2008
@@ -32,6 +32,7 @@
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
 import org.apache.qpid.server.flow.LimitlessCreditManager;
+import org.apache.qpid.server.flow.Pre0_10CreditManager;
 import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.store.TestMemoryMessageStore;
@@ -301,6 +302,31 @@
         }
     }
 
+            /**
+     * A regression fixing QPID-1136 showed this up
+     *
+     * @throws Exception
+     */
+    public void testMessageDequeueRestoresCreditTest() throws Exception
+    {
+        // Send 10 messages
+        Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1);
+
+        _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession,
+                                                                            DEFAULT_CONSUMER_TAG, true, null, false, creditManager);
+        final int msgCount = 1;
+        publishMessages(msgCount);
+
+        _queue.deliverAsync(_subscription);
+
+        _channel.acknowledgeMessage(1, false);
+
+        // Check credit available
+        assertTrue("No credit available", creditManager.hasCredit());
+
+    }
+
+
 /*
     public void testPrefetchHighLow() throws AMQException
     {