You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/07/25 15:39:55 UTC

svn commit: r679801 - in /incubator/qpid/branches/qpid.0-10/cpp/src/qpid/broker: Queue.cpp Queue.h QueuePolicy.cpp QueuePolicy.h

Author: gsim
Date: Fri Jul 25 06:39:55 2008
New Revision: 679801

URL: http://svn.apache.org/viewvc?rev=679801&view=rev
Log:
Only reduce count and size maintained for queue plicy when messages are actually dequeued (i.e. acked).


Modified:
    incubator/qpid/branches/qpid.0-10/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/branches/qpid.0-10/cpp/src/qpid/broker/Queue.h
    incubator/qpid/branches/qpid.0-10/cpp/src/qpid/broker/QueuePolicy.cpp
    incubator/qpid/branches/qpid.0-10/cpp/src/qpid/broker/QueuePolicy.h

Modified: incubator/qpid/branches/qpid.0-10/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/cpp/src/qpid/broker/Queue.cpp?rev=679801&r1=679800&r2=679801&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/branches/qpid.0-10/cpp/src/qpid/broker/Queue.cpp Fri Jul 25 06:39:55 2008
@@ -250,7 +250,7 @@
             if (c.filter(msg.payload)) {
                 if (c.accept(msg.payload)) {            
                     m = msg;
-                    pop();
+                    messages.pop_front();
                     return true;
                 } else {
                     //message(s) are available but consumer hasn't got enough credit
@@ -262,7 +262,7 @@
                 if (canExcludeUnwanted()) {
                     //hack for no-local on JMS topics; get rid of this message
                     QPID_LOG(debug, "Excluding message from '" << name << "'");
-                    pop();
+                    messages.pop_front();
                 } else {
                     //leave it for another consumer
                     QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
@@ -385,7 +385,7 @@
 
     if(!messages.empty()){
         msg = messages.front();
-        pop();
+        messages.pop_front();
     }
     return msg;
 }
@@ -394,37 +394,11 @@
     Mutex::ScopedLock locker(messageLock);
     int count = messages.size();
     while(!messages.empty()) {
-        QueuedMessage& msg = messages.front();
-        if (store && msg.payload->isPersistent()) {
-            boost::intrusive_ptr<PersistableMessage> pmsg =
-                boost::static_pointer_cast<PersistableMessage>(msg.payload);
-            store->dequeue(0, pmsg, *this);
-        }
-        pop();
+        popAndDequeue();
     }
     return count;
 }
 
-/**
- * Assumes messageLock is held
- */
-void Queue::pop(){
-    QueuedMessage& msg = messages.front();
-
-    if (policy.get()) policy->dequeued(msg.payload->contentSize());
-    if (mgmtObject.get() != 0){
-        Mutex::ScopedLock mutex(mgmtObject->getLock());
-        mgmtObject->inc_msgTotalDequeues  ();
-        mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize());
-        mgmtObject->dec_msgDepth ();
-        if (msg.payload->isPersistent ()){
-            mgmtObject->inc_msgPersistDequeues ();
-            mgmtObject->inc_bytePersistDequeues (msg.payload->contentSize());
-        }
-    }
-    messages.pop_front();
-}
-
 void Queue::push(boost::intrusive_ptr<Message>& msg){
     Mutex::ScopedLock locker(messageLock);   
     messages.push_back(QueuedMessage(this, msg, ++sequence));
@@ -441,7 +415,7 @@
             } else {
                 QPID_LOG(error, "Message " << msg << " on " << name
                          << " exceeds the policy for the queue but can't be released from memory as the queue is not durable");
-                throw ResourceLimitExceededException(QPID_MSG("Policy exceeded for " << name));
+                throw ResourceLimitExceededException(QPID_MSG("Policy exceeded for " << name << " " << *policy));
             }
         } else {
             if (policyExceeded) {
@@ -495,6 +469,10 @@
 // return true if store exists, 
 bool Queue::dequeue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
 {
+    {
+        Mutex::ScopedLock locker(messageLock);
+        dequeued(msg);
+    }
     if (msg->isPersistent() && store) {
         msg->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
         boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg);
@@ -505,6 +483,34 @@
     return false;
 }
 
+/**
+ * Removes a message from the in-memory delivery queue as well
+ * dequeing it from the logical (and persistent if applicable) queue
+ */
+void Queue::popAndDequeue()
+{
+    boost::intrusive_ptr<Message> msg = messages.front().payload;
+    messages.pop_front();
+    dequeue(0, msg);
+}
+
+/**
+ * Updates policy and management when a message has been dequeued,
+ * expects messageLock to be held
+ */
+void Queue::dequeued(boost::intrusive_ptr<Message>& msg)
+{
+    if (policy.get()) policy->dequeued(msg->contentSize());
+    if (mgmtObject != 0){
+        mgmtObject->inc_msgTotalDequeues  ();
+        mgmtObject->inc_byteTotalDequeues (msg->contentSize());
+        if (msg->isPersistent ()){
+            mgmtObject->inc_msgPersistDequeues ();
+            mgmtObject->inc_bytePersistDequeues (msg->contentSize());
+        }
+    }
+}
+
 
 namespace 
 {
@@ -554,7 +560,7 @@
             DeliverableMessage msg(messages.front().payload);
             alternateExchange->route(msg, msg.getMessage().getRoutingKey(),
                                      msg.getMessage().getApplicationHeaders());
-            pop();
+            popAndDequeue();
         }
         alternateExchange->decAlternateUsers();
     }

Modified: incubator/qpid/branches/qpid.0-10/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/cpp/src/qpid/broker/Queue.h?rev=679801&r1=679800&r2=679801&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/branches/qpid.0-10/cpp/src/qpid/broker/Queue.h Fri Jul 25 06:39:55 2008
@@ -88,7 +88,6 @@
             framing::SequenceNumber sequence;
             management::Queue::shared_ptr mgmtObject;
 
-            void pop();
             void push(boost::intrusive_ptr<Message>& msg);
             void setPolicy(std::auto_ptr<QueuePolicy> policy);
             bool seek(QueuedMessage& msg, Consumer& position);
@@ -103,6 +102,9 @@
 
             bool isExcluded(boost::intrusive_ptr<Message>& msg);
 
+            void dequeued(boost::intrusive_ptr<Message>& msg);
+            void popAndDequeue();
+
         public:
             virtual void notifyDurableIOComplete();
             typedef boost::shared_ptr<Queue> shared_ptr;
@@ -169,6 +171,7 @@
              * dequeue from store (only done once messages is acknowledged)
              */
             bool dequeue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg);
+
             /**
              * dequeues from memory only
              */

Modified: incubator/qpid/branches/qpid.0-10/cpp/src/qpid/broker/QueuePolicy.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/cpp/src/qpid/broker/QueuePolicy.cpp?rev=679801&r1=679800&r2=679801&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/cpp/src/qpid/broker/QueuePolicy.cpp (original)
+++ incubator/qpid/branches/qpid.0-10/cpp/src/qpid/broker/QueuePolicy.cpp Fri Jul 25 06:39:55 2008
@@ -71,3 +71,18 @@
 const std::string QueuePolicy::maxSizeKey("qpid.max_size");
 uint64_t QueuePolicy::defaultMaxSize(0);
 
+namespace qpid {
+    namespace broker {
+
+std::ostream& operator<<(std::ostream& out, const QueuePolicy& p)
+{
+    if (p.maxSize) out << "size: max=" << p.maxSize << ", current=" << p.size;
+    else out << "size unlimited, current=" << p.size;
+    out << "; ";
+    if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count;
+    else out << "count unlimited, current=" << p.count;    
+    return out;
+}
+
+    }
+}

Modified: incubator/qpid/branches/qpid.0-10/cpp/src/qpid/broker/QueuePolicy.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/cpp/src/qpid/broker/QueuePolicy.h?rev=679801&r1=679800&r2=679801&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/cpp/src/qpid/broker/QueuePolicy.h (original)
+++ incubator/qpid/branches/qpid.0-10/cpp/src/qpid/broker/QueuePolicy.h Fri Jul 25 06:39:55 2008
@@ -21,6 +21,7 @@
 #ifndef _QueuePolicy_
 #define _QueuePolicy_
 
+#include <iostream>
 #include "qpid/framing/FieldTable.h"
 
 namespace qpid {
@@ -50,6 +51,7 @@
             uint64_t getMaxSize() const { return maxSize; }           
 
             static void setDefaultMaxSize(uint64_t);
+	    friend std::ostream& operator<<(std::ostream&, const QueuePolicy&);
         };
     }
 }