You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/07/08 16:53:52 UTC

svn commit: r674848 - in /incubator/qpid/trunk/qpid/cpp/src/qpid/broker: Queue.cpp Queue.h

Author: gsim
Date: Tue Jul  8 07:53:51 2008
New Revision: 674848

URL: http://svn.apache.org/viewvc?rev=674848&view=rev
Log:
* release message lock when notifying queue listeners
* take copy of listeners
* remove unused functionality


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=674848&r1=674847&r2=674848&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Jul  8 07:53:51 2008
@@ -202,19 +202,6 @@
     return false;
 }
 
-/**
- * Return true if the message can be excluded. This is currently the
- * case if the queue is exclusive and has an exclusive consumer that
- * doesn't want the message or has a single consumer that doesn't want
- * the message (covers the JMS topic case).
- */
-bool Queue::canExcludeUnwanted()
-{
-    Mutex::ScopedLock locker(consumerLock);
-    return hasExclusiveOwner() && (exclusive || consumerCount == 1);
-}
-
-
 bool Queue::getNextMessage(QueuedMessage& m, Consumer& c)
 {
     if (c.preAcquires()) {
@@ -252,15 +239,8 @@
                 }
             } else {
                 //consumer will never want this message
-                if (canExcludeUnwanted()) {
-                    //hack for no-local on JMS topics; get rid of this message
-                    QPID_LOG(debug, "Excluding message from '" << name << "'");
-                    pop();
-                } else {
-                    //leave it for another consumer
-                    QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
-                    return false;
-                }
+                QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
+                return false;
             } 
         }
     }
@@ -291,22 +271,35 @@
     return false;
 }
 
+/**
+ * notify listeners that there may be messages to process
+ */
 void Queue::notify()
 {
-    //notify listeners that there may be messages to process
-    for_each(listeners.begin(), listeners.end(), mem_fun(&Consumer::notify));
+    if (listeners.empty()) return;
+
+    Listeners copy(listeners);
     listeners.clear();
+
+    sys::ScopedLock<Guard> g(notifierLock);//prevent consumers being deleted while held in copy
+    {
+        Mutex::ScopedUnlock u(messageLock);
+        for_each(copy.begin(), copy.end(), mem_fun(&Consumer::notify));
+    }
 }
 
 void Queue::removeListener(Consumer& c)
 {
     Mutex::ScopedLock locker(messageLock);
-    listeners.erase(&c);
+    notifierLock.wait(messageLock);//wait until no notifies are in progress 
+    Listeners::iterator i = find(listeners.begin(), listeners.end(), &c);
+    if (i != listeners.end()) listeners.erase(i);
 }
 
 void Queue::addListener(Consumer& c)
 {
-    listeners.insert(&c);
+    Listeners::iterator i = find(listeners.begin(), listeners.end(), &c);
+    if (i == listeners.end()) listeners.push_back(&c);
 }
 
 bool Queue::dispatch(Consumer& c)
@@ -682,6 +675,27 @@
     }
 }
 
+/*
+ * Use of Guard requires an external lock to be held before calling
+ * any of its methods
+ */
+Queue::Guard::Guard() : count(0) {}
+
+void Queue::Guard::lock()
+{
+    count++;
+}
+
+void Queue::Guard::unlock()
+{
+    if (--count == 0) condition.notifyAll();
+}
+
+void Queue::Guard::wait(sys::Mutex& m)
+{
+    while (count) condition.wait(m);
+}
+
 ManagementObject::shared_ptr Queue::GetManagementObject (void) const
 {
     return dynamic_pointer_cast<ManagementObject> (mgmtObject);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=674848&r1=674847&r2=674848&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Tue Jul  8 07:53:51 2008
@@ -38,7 +38,6 @@
 #include <vector>
 #include <memory>
 #include <deque>
-#include <set>
 
 #include <boost/shared_ptr.hpp>
 #include <boost/enable_shared_from_this.hpp>
@@ -62,9 +61,20 @@
          */
         class Queue : public boost::enable_shared_from_this<Queue>,
             public PersistableQueue, public management::Manageable {
-            typedef std::set<Consumer*> Listeners;
+            typedef qpid::InlineVector<Consumer*, 5> Listeners;
             typedef std::deque<QueuedMessage> Messages;
 
+            class Guard 
+            {
+                qpid::sys::Condition condition;
+                size_t count;
+              public:
+                Guard();
+                void lock();
+                void unlock();
+                void wait(sys::Mutex&);
+            };
+
             const string name;
             const bool autodelete;
             MessageStore* store;
@@ -79,6 +89,7 @@
             mutable qpid::sys::Mutex consumerLock;
             mutable qpid::sys::Mutex messageLock;
             mutable qpid::sys::Mutex ownershipLock;
+            Guard notifierLock;
             mutable uint64_t persistenceId;
             framing::FieldTable settings;
             std::auto_ptr<QueuePolicy> policy;
@@ -95,7 +106,6 @@
             bool getNextMessage(QueuedMessage& msg, Consumer& c);
             bool consumeNextMessage(QueuedMessage& msg, Consumer& c);
             bool browseNextMessage(QueuedMessage& msg, Consumer& c);
-            bool canExcludeUnwanted();
 
             void notify();
             void removeListener(Consumer&);