You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2008/08/12 11:36:09 UTC
svn commit: r685104 - in /incubator/qpid/trunk/qpid/java:
broker/src/main/java/org/apache/qpid/server/ack/
broker/src/main/java/org/apache/qpid/server/queue/
broker/src/main/java/org/apache/qpid/server/txn/
systests/src/main/java/org/apache/qpid/server...
Author: ritchiem
Date: Tue Aug 12 02:36:08 2008
New Revision: 685104
URL: http://svn.apache.org/viewvc?rev=685104&view=rev
Log:
QPID-1136 : Fixed Flow Control problem due to this change and added test to validate that Flow Control is operating correctly
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java?rev=685104&r1=685103&r2=685104&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java Tue Aug 12 02:36:08 2008
@@ -116,7 +116,6 @@
//make persistent changes, i.e. dequeue and decrementReference
for (QueueEntry msg : _unacked.values())
{
- msg.restoreCredit();
//Message has been ack so discard it. This will dequeue and decrement the reference.
msg.discard(storeContext);
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java?rev=685104&r1=685103&r2=685104&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java Tue Aug 12 02:36:08 2008
@@ -94,7 +94,7 @@
if(message != null)
{
_unackedSize -= message.getMessage().getSize();
- message.restoreCredit();
+
}
return message;
@@ -185,8 +185,6 @@
_unackedSize -= unacked.getValue().getMessage().getSize();
- unacked.getValue().restoreCredit();
-
if (unacked.getKey() == deliveryTag)
{
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=685104&r1=685103&r2=685104&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Tue Aug 12 02:36:08 2008
@@ -175,8 +175,6 @@
void dispose(final StoreContext storeContext) throws MessageCleanupException;
- void restoreCredit();
-
void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException;
boolean isQueueDeleted();
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=685104&r1=685103&r2=685104&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Tue Aug 12 02:36:08 2008
@@ -256,6 +256,12 @@
if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE))
{
+ if (state instanceof SubscriptionAcquiredState)
+ {
+ Subscription s = ((SubscriptionAcquiredState) state).getSubscription();
+ s.restoreCredit(this);
+ }
+
getQueue().dequeue(storeContext, this);
if(_stateChangeListeners != null)
{
@@ -282,16 +288,6 @@
}
}
- public void restoreCredit()
- {
- EntryState state = _state;
- if(state instanceof SubscriptionAcquiredState)
- {
- Subscription s = ((SubscriptionAcquiredState) _state).getSubscription();
- s.restoreCredit(this);
- }
- }
-
public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException
{
//if the queue is null then the message is waiting to be acked, but has been removed.
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?rev=685104&r1=685103&r2=685104&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java Tue Aug 12 02:36:08 2008
@@ -134,7 +134,6 @@
{
beginTranIfNecessary();
}
- message.restoreCredit();
//Message has been ack so discard it. This will dequeue and decrement the reference.
message.discard(_storeContext);
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java?rev=685104&r1=685103&r2=685104&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java Tue Aug 12 02:36:08 2008
@@ -32,6 +32,7 @@
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.flow.LimitlessCreditManager;
+import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.TestMemoryMessageStore;
@@ -301,6 +302,31 @@
}
}
+ /**
+ * A regression fixing QPID-1136 showed this up
+ *
+ * @throws Exception
+ */
+ public void testMessageDequeueRestoresCreditTest() throws Exception
+ {
+ // Send 10 messages
+ Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1);
+
+ _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession,
+ DEFAULT_CONSUMER_TAG, true, null, false, creditManager);
+ final int msgCount = 1;
+ publishMessages(msgCount);
+
+ _queue.deliverAsync(_subscription);
+
+ _channel.acknowledgeMessage(1, false);
+
+ // Check credit available
+ assertTrue("No credit available", creditManager.hasCredit());
+
+ }
+
+
/*
public void testPrefetchHighLow() throws AMQException
{