You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2010/03/23 12:54:48 UTC

svn commit: r926530 - in /qpid/trunk/qpid/java/broker/src: main/java/org/apache/qpid/qmf/ main/java/org/apache/qpid/server/configuration/ main/java/org/apache/qpid/server/queue/ test/java/org/apache/qpid/server/queue/

Author: robbie
Date: Tue Mar 23 11:54:48 2010
New Revision: 926530

URL: http://svn.apache.org/viewvc?rev=926530&view=rev
Log:
QPID-2379: add the queue UnackedMessage counts

Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java?rev=926530&r1=926529&r2=926530&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java Tue Mar 23 11:54:48 2010
@@ -1004,14 +1004,12 @@ public class QMFService implements Confi
 
         public Long getUnackedMessages()
         {
-            // TODO
-            return 0l;
+            return _obj.getUnackedMessageCount();
         }
 
         public Long getUnackedMessagesHigh()
         {
-            // TODO
-            return 0l;
+            return _obj.getUnackedMessageCountHigh();
         }
 
         public Long getUnackedMessagesLow()

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java?rev=926530&r1=926529&r2=926530&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfig.java Tue Mar 23 11:54:48 2010
@@ -75,6 +75,10 @@ public interface QueueConfig extends Con
     long getPersistentMsgEnqueues();
 
     long getPersistentMsgDequeues();
+    
+    long getUnackedMessageCount();
+    
+    long getUnackedMessageCountHigh();
 
     void purge(long request);
 }
\ No newline at end of file

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=926530&r1=926529&r2=926530&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Tue Mar 23 11:54:48 2010
@@ -111,6 +111,7 @@ public interface AMQQueue extends Managa
 
     void dequeue(QueueEntry entry, Subscription sub);
 
+    void decrementUnackedMsgCount();
 
 
     boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException;

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=926530&r1=926529&r2=926530&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Tue Mar 23 11:54:48 2010
@@ -226,22 +226,29 @@ public class QueueEntryImpl implements Q
 
     public void release()
     {
-        _stateUpdater.set(this,AVAILABLE_STATE);
-        if(!getQueue().isDeleted())
+        EntryState state = _state;
+        
+        if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, AVAILABLE_STATE))
         {
-            getQueue().requeue(this);
-            if(_stateChangeListeners != null)
+            if(state instanceof SubscriptionAcquiredState)
             {
-                notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
+                getQueue().decrementUnackedMsgCount();
             }
+            
+            if(!getQueue().isDeleted())
+            {
+                getQueue().requeue(this);
+                if(_stateChangeListeners != null)
+                {
+                    notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
+                }
 
+            }
+            else if(acquire())
+            {
+                routeToAlternate();
+            }
         }
-        else if(acquire())
-        {
-            routeToAlternate();
-        }
-
-
     }
 
     public boolean releaseButRetain()
@@ -369,6 +376,7 @@ public class QueueEntryImpl implements Q
             Subscription s = null;
             if (state instanceof SubscriptionAcquiredState)
             {
+                getQueue().decrementUnackedMsgCount();
                 s = ((SubscriptionAcquiredState) state).getSubscription();
                 s.onDequeue(this);
             }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=926530&r1=926529&r2=926530&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Tue Mar 23 11:54:48 2010
@@ -127,6 +127,8 @@ public class SimpleAMQQueue implements A
     private final AtomicLong _byteTxnEnqueues = new AtomicLong(0);
     private final AtomicLong _msgTxnDequeues = new AtomicLong(0);
     private final AtomicLong _byteTxnDequeues = new AtomicLong(0);
+    private final AtomicLong _unackedMsgCount = new AtomicLong(0);
+    private final AtomicLong _unackedMsgCountHigh = new AtomicLong(0);
 
     private final AtomicInteger _bindingCountHigh = new AtomicInteger();
 
@@ -693,6 +695,8 @@ public class SimpleAMQQueue implements A
             throws AMQException
     {
         _deliveredMessages.incrementAndGet();
+        incrementUnackedMsgCount();
+
         sub.send(entry);
 
         setLastSeenEntry(sub,entry);
@@ -2138,4 +2142,33 @@ public class SimpleAMQQueue implements A
     {
         return String.valueOf(getNameShortString());
     }
+
+    public long getUnackedMessageCountHigh()
+    {
+        return _unackedMsgCountHigh.get();
+    }
+    
+    public long getUnackedMessageCount()
+    {
+        return _unackedMsgCount.get();
+    }
+    
+    public void decrementUnackedMsgCount()
+    {
+        _unackedMsgCount.decrementAndGet();
+    }
+    
+    private void incrementUnackedMsgCount()
+    {
+        long unackedMsgCount = _unackedMsgCount.incrementAndGet();
+        
+        long unackedMsgCountHigh;
+        while(unackedMsgCount > (unackedMsgCountHigh = _unackedMsgCountHigh.get()))
+        {
+            if(_unackedMsgCountHigh.compareAndSet(unackedMsgCountHigh, unackedMsgCount))
+            {
+                break;
+            }
+        }
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=926530&r1=926529&r2=926530&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java Tue Mar 23 11:54:48 2010
@@ -572,4 +572,19 @@ public class MockAMQQueue implements AMQ
     {
         return 0;
     }
+
+    public void decrementUnackedMsgCount()
+    {
+
+    }
+
+    public long getUnackedMessageCount()
+    {
+        return 0;
+    }
+
+    public long getUnackedMessageCountHigh()
+    {
+        return 0;
+    }
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org