You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/07/25 15:50:32 UTC

svn commit: r679805 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/broker/Queue.cpp qpid/broker/Queue.h qpid/broker/QueuePolicy.cpp qpid/broker/QueuePolicy.h qpid/broker/SemanticState.cpp tests/QueueTest.cpp tests/TxPublishTest.cpp

Author: gsim
Date: Fri Jul 25 06:50:32 2008
New Revision: 679805

URL: http://svn.apache.org/viewvc?rev=679805&view=rev
Log:
Only reduce count and size maintained for queue plicy when messages are actually dequeued (i.e. acked).


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/TxPublishTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=679805&r1=679804&r2=679805&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Fri Jul 25 06:50:32 2008
@@ -230,7 +230,7 @@
             if (c.filter(msg.payload)) {
                 if (c.accept(msg.payload)) {            
                     m = msg;
-                    pop();
+                    messages.pop_front();
                     return true;
                 } else {
                     //message(s) are available but consumer hasn't got enough credit
@@ -361,13 +361,13 @@
         mgmtObject->dec_consumerCount ();
 }
 
-QueuedMessage Queue::dequeue(){
+QueuedMessage Queue::get(){
     Mutex::ScopedLock locker(messageLock);
     QueuedMessage msg(this);
 
     if(!messages.empty()){
         msg = messages.front();
-        pop();
+        messages.pop_front();
     }
     return msg;
 }
@@ -376,35 +376,11 @@
     Mutex::ScopedLock locker(messageLock);
     int count = messages.size();
     while(!messages.empty()) {
-        QueuedMessage& msg = messages.front();
-        if (store && msg.payload->isPersistent()) {
-            boost::intrusive_ptr<PersistableMessage> pmsg =
-                boost::static_pointer_cast<PersistableMessage>(msg.payload);
-            store->dequeue(0, pmsg, *this);
-        }
-        pop();
+        popAndDequeue();
     }
     return count;
 }
 
-/**
- * Assumes messageLock is held
- */
-void Queue::pop(){
-    QueuedMessage& msg = messages.front();
-
-    if (policy.get()) policy->dequeued(msg.payload->contentSize());
-    if (mgmtObject != 0){
-        mgmtObject->inc_msgTotalDequeues  ();
-        mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize());
-        if (msg.payload->isPersistent ()){
-            mgmtObject->inc_msgPersistDequeues ();
-            mgmtObject->inc_bytePersistDequeues (msg.payload->contentSize());
-        }
-    }
-    messages.pop_front();
-}
-
 void Queue::push(boost::intrusive_ptr<Message>& msg){
     Mutex::ScopedLock locker(messageLock);   
     messages.push_back(QueuedMessage(this, msg, ++sequence));
@@ -421,7 +397,7 @@
             } else {
                 QPID_LOG(error, "Message " << msg << " on " << name
                          << " exceeds the policy for the queue but can't be released from memory as the queue is not durable");
-                throw ResourceLimitExceededException(QPID_MSG("Policy exceeded for " << name));
+                throw ResourceLimitExceededException(QPID_MSG("Policy exceeded for " << name << " " << *policy));
             }
         } else {
             if (policyExceeded) {
@@ -475,6 +451,10 @@
 // return true if store exists, 
 bool Queue::dequeue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
 {
+    {
+        Mutex::ScopedLock locker(messageLock);
+        dequeued(msg);
+    }
     if (msg->isPersistent() && store) {
         msg->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
         boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg);
@@ -485,6 +465,34 @@
     return false;
 }
 
+/**
+ * Removes a message from the in-memory delivery queue as well
+ * dequeing it from the logical (and persistent if applicable) queue
+ */
+void Queue::popAndDequeue()
+{
+    boost::intrusive_ptr<Message> msg = messages.front().payload;
+    messages.pop_front();
+    dequeue(0, msg);
+}
+
+/**
+ * Updates policy and management when a message has been dequeued,
+ * expects messageLock to be held
+ */
+void Queue::dequeued(boost::intrusive_ptr<Message>& msg)
+{
+    if (policy.get()) policy->dequeued(msg->contentSize());
+    if (mgmtObject != 0){
+        mgmtObject->inc_msgTotalDequeues  ();
+        mgmtObject->inc_byteTotalDequeues (msg->contentSize());
+        if (msg->isPersistent ()){
+            mgmtObject->inc_msgPersistDequeues ();
+            mgmtObject->inc_bytePersistDequeues (msg->contentSize());
+        }
+    }
+}
+
 
 namespace 
 {
@@ -534,7 +542,7 @@
             DeliverableMessage msg(messages.front().payload);
             alternateExchange->route(msg, msg.getMessage().getRoutingKey(),
                                      msg.getMessage().getApplicationHeaders());
-            pop();
+            popAndDequeue();
         }
         alternateExchange->decAlternateUsers();
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=679805&r1=679804&r2=679805&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Fri Jul 25 06:50:32 2008
@@ -98,7 +98,6 @@
             framing::SequenceNumber sequence;
             management::Queue* mgmtObject;
 
-            void pop();
             void push(boost::intrusive_ptr<Message>& msg);
             void setPolicy(std::auto_ptr<QueuePolicy> policy);
             bool seek(QueuedMessage& msg, Consumer& position);
@@ -112,6 +111,9 @@
 
             bool isExcluded(boost::intrusive_ptr<Message>& msg);
 
+            void dequeued(boost::intrusive_ptr<Message>& msg);
+            void popAndDequeue();
+
         public:
             virtual void notifyDurableIOComplete();
             typedef boost::shared_ptr<Queue> shared_ptr;
@@ -178,10 +180,11 @@
              * dequeue from store (only done once messages is acknowledged)
              */
             bool dequeue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg);
+
             /**
-             * dequeues from memory only
+             * Gets the next available message 
              */
-            QueuedMessage dequeue();
+            QueuedMessage get();
 
             const QueuePolicy* getPolicy();
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp?rev=679805&r1=679804&r2=679805&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp Fri Jul 25 06:50:32 2008
@@ -71,3 +71,18 @@
 const std::string QueuePolicy::maxSizeKey("qpid.max_size");
 uint64_t QueuePolicy::defaultMaxSize(0);
 
+namespace qpid {
+    namespace broker {
+
+std::ostream& operator<<(std::ostream& out, const QueuePolicy& p)
+{
+    if (p.maxSize) out << "size: max=" << p.maxSize << ", current=" << p.size;
+    else out << "size unlimited, current=" << p.size;
+    out << "; ";
+    if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count;
+    else out << "count unlimited, current=" << p.count;    
+    return out;
+}
+
+    }
+}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h?rev=679805&r1=679804&r2=679805&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h Fri Jul 25 06:50:32 2008
@@ -21,6 +21,7 @@
 #ifndef _QueuePolicy_
 #define _QueuePolicy_
 
+#include <iostream>
 #include "qpid/framing/FieldTable.h"
 
 namespace qpid {
@@ -50,6 +51,7 @@
             uint64_t getMaxSize() const { return maxSize; }           
 
             static void setDefaultMaxSize(uint64_t);
+	    friend std::ostream& operator<<(std::ostream&, const QueuePolicy&);
         };
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=679805&r1=679804&r2=679805&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Fri Jul 25 06:50:32 2008
@@ -429,7 +429,7 @@
 
 bool SemanticState::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected)
 {
-    QueuedMessage msg = queue->dequeue();
+    QueuedMessage msg = queue->get();
     if(msg.payload){
         DeliveryId myDeliveryTag = deliveryAdapter.deliver(msg, token);
         if(ackExpected){

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=679805&r1=679804&r2=679805&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Fri Jul 25 06:50:32 2008
@@ -91,7 +91,7 @@
     BOOST_CHECK(!c1.received);
     msg1->enqueueComplete();
     
-    received = queue->dequeue().payload;
+    received = queue->get().payload;
     BOOST_CHECK_EQUAL(msg1.get(), received.get());    
 }
     
@@ -179,11 +179,11 @@
     
     BOOST_CHECK_EQUAL(uint32_t(3), queue->getMessageCount());
     
-    received = queue->dequeue().payload;
+    received = queue->get().payload;
     BOOST_CHECK_EQUAL(msg1.get(), received.get());
     BOOST_CHECK_EQUAL(uint32_t(2), queue->getMessageCount());
 
-    received = queue->dequeue().payload;
+    received = queue->get().payload;
     BOOST_CHECK_EQUAL(msg2.get(), received.get());
     BOOST_CHECK_EQUAL(uint32_t(1), queue->getMessageCount());
 
@@ -196,7 +196,7 @@
     BOOST_CHECK_EQUAL(msg3.get(), consumer.last.get());
     BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount());
 
-    received = queue->dequeue().payload;
+    received = queue->get().payload;
     BOOST_CHECK(!received);
     BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount());
         

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/TxPublishTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/TxPublishTest.cpp?rev=679805&r1=679804&r2=679805&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/TxPublishTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/TxPublishTest.cpp Fri Jul 25 06:50:32 2008
@@ -82,13 +82,13 @@
     t.op.prepare(0);
     t.op.commit();
     BOOST_CHECK_EQUAL((uint32_t) 1, t.queue1->getMessageCount());
-    intrusive_ptr<Message> msg_dequeue = t.queue1->dequeue().payload;
+    intrusive_ptr<Message> msg_dequeue = t.queue1->get().payload;
 
     BOOST_CHECK_EQUAL( true, (static_pointer_cast<PersistableMessage>(msg_dequeue))->isEnqueueComplete());
     BOOST_CHECK_EQUAL(t.msg, msg_dequeue);
 
     BOOST_CHECK_EQUAL((uint32_t) 1, t.queue2->getMessageCount());
-    BOOST_CHECK_EQUAL(t.msg, t.queue2->dequeue().payload);            
+    BOOST_CHECK_EQUAL(t.msg, t.queue2->get().payload);            
 }
 
 QPID_AUTO_TEST_SUITE_END()