You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2010/03/23 12:54:48 UTC
svn commit: r926530 - in /qpid/trunk/qpid/java/broker/src:
main/java/org/apache/qpid/qmf/
main/java/org/apache/qpid/server/configuration/
main/java/org/apache/qpid/server/queue/
test/java/org/apache/qpid/server/queue/
Author: robbie
Date: Tue Mar 23 11:54:48 2010
New Revision: 926530
URL: http://svn.apache.org/viewvc?rev=926530&view=rev
Log:
QPID-2379: add the queue UnackedMessage counts
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java?rev=926530&r1=926529&r2=926530&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java Tue Mar 23 11:54:48 2010
@@ -1004,14 +1004,12 @@ public class QMFService implements Confi
public Long getUnackedMessages()
{
- // TODO
- return 0l;
+ return _obj.getUnackedMessageCount();
}
public Long getUnackedMessagesHigh()
{
- // TODO
- return 0l;
+ return _obj.getUnackedMessageCountHigh();
}
public Long getUnackedMessagesLow()
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java?rev=926530&r1=926529&r2=926530&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java Tue Mar 23 11:54:48 2010
@@ -75,6 +75,10 @@ public interface QueueConfig extends Con
long getPersistentMsgEnqueues();
long getPersistentMsgDequeues();
+
+ long getUnackedMessageCount();
+
+ long getUnackedMessageCountHigh();
void purge(long request);
}
\ No newline at end of file
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=926530&r1=926529&r2=926530&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Tue Mar 23 11:54:48 2010
@@ -111,6 +111,7 @@ public interface AMQQueue extends Managa
void dequeue(QueueEntry entry, Subscription sub);
+ void decrementUnackedMsgCount();
boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException;
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=926530&r1=926529&r2=926530&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Tue Mar 23 11:54:48 2010
@@ -226,22 +226,29 @@ public class QueueEntryImpl implements Q
public void release()
{
- _stateUpdater.set(this,AVAILABLE_STATE);
- if(!getQueue().isDeleted())
+ EntryState state = _state;
+
+ if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, AVAILABLE_STATE))
{
- getQueue().requeue(this);
- if(_stateChangeListeners != null)
+ if(state instanceof SubscriptionAcquiredState)
{
- notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
+ getQueue().decrementUnackedMsgCount();
}
+
+ if(!getQueue().isDeleted())
+ {
+ getQueue().requeue(this);
+ if(_stateChangeListeners != null)
+ {
+ notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
+ }
+ }
+ else if(acquire())
+ {
+ routeToAlternate();
+ }
}
- else if(acquire())
- {
- routeToAlternate();
- }
-
-
}
public boolean releaseButRetain()
@@ -369,6 +376,7 @@ public class QueueEntryImpl implements Q
Subscription s = null;
if (state instanceof SubscriptionAcquiredState)
{
+ getQueue().decrementUnackedMsgCount();
s = ((SubscriptionAcquiredState) state).getSubscription();
s.onDequeue(this);
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=926530&r1=926529&r2=926530&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Tue Mar 23 11:54:48 2010
@@ -127,6 +127,8 @@ public class SimpleAMQQueue implements A
private final AtomicLong _byteTxnEnqueues = new AtomicLong(0);
private final AtomicLong _msgTxnDequeues = new AtomicLong(0);
private final AtomicLong _byteTxnDequeues = new AtomicLong(0);
+ private final AtomicLong _unackedMsgCount = new AtomicLong(0);
+ private final AtomicLong _unackedMsgCountHigh = new AtomicLong(0);
private final AtomicInteger _bindingCountHigh = new AtomicInteger();
@@ -693,6 +695,8 @@ public class SimpleAMQQueue implements A
throws AMQException
{
_deliveredMessages.incrementAndGet();
+ incrementUnackedMsgCount();
+
sub.send(entry);
setLastSeenEntry(sub,entry);
@@ -2138,4 +2142,33 @@ public class SimpleAMQQueue implements A
{
return String.valueOf(getNameShortString());
}
+
+ public long getUnackedMessageCountHigh()
+ {
+ return _unackedMsgCountHigh.get();
+ }
+
+ public long getUnackedMessageCount()
+ {
+ return _unackedMsgCount.get();
+ }
+
+ public void decrementUnackedMsgCount()
+ {
+ _unackedMsgCount.decrementAndGet();
+ }
+
+ private void incrementUnackedMsgCount()
+ {
+ long unackedMsgCount = _unackedMsgCount.incrementAndGet();
+
+ long unackedMsgCountHigh;
+ while(unackedMsgCount > (unackedMsgCountHigh = _unackedMsgCountHigh.get()))
+ {
+ if(_unackedMsgCountHigh.compareAndSet(unackedMsgCountHigh, unackedMsgCount))
+ {
+ break;
+ }
+ }
+ }
}
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=926530&r1=926529&r2=926530&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java Tue Mar 23 11:54:48 2010
@@ -572,4 +572,19 @@ public class MockAMQQueue implements AMQ
{
return 0;
}
+
+ public void decrementUnackedMsgCount()
+ {
+
+ }
+
+ public long getUnackedMessageCount()
+ {
+ return 0;
+ }
+
+ public long getUnackedMessageCountHigh()
+ {
+ return 0;
+ }
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org