You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/07/08 16:53:52 UTC
svn commit: r674848 - in /incubator/qpid/trunk/qpid/cpp/src/qpid/broker:
Queue.cpp Queue.h
Author: gsim
Date: Tue Jul 8 07:53:51 2008
New Revision: 674848
URL: http://svn.apache.org/viewvc?rev=674848&view=rev
Log:
* release message lock when notifying queue listeners
* take copy of listeners
* remove unused functionality
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=674848&r1=674847&r2=674848&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Jul 8 07:53:51 2008
@@ -202,19 +202,6 @@
return false;
}
-/**
- * Return true if the message can be excluded. This is currently the
- * case if the queue is exclusive and has an exclusive consumer that
- * doesn't want the message or has a single consumer that doesn't want
- * the message (covers the JMS topic case).
- */
-bool Queue::canExcludeUnwanted()
-{
- Mutex::ScopedLock locker(consumerLock);
- return hasExclusiveOwner() && (exclusive || consumerCount == 1);
-}
-
-
bool Queue::getNextMessage(QueuedMessage& m, Consumer& c)
{
if (c.preAcquires()) {
@@ -252,15 +239,8 @@
}
} else {
//consumer will never want this message
- if (canExcludeUnwanted()) {
- //hack for no-local on JMS topics; get rid of this message
- QPID_LOG(debug, "Excluding message from '" << name << "'");
- pop();
- } else {
- //leave it for another consumer
- QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
- return false;
- }
+ QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
+ return false;
}
}
}
@@ -291,22 +271,35 @@
return false;
}
+/**
+ * notify listeners that there may be messages to process
+ */
void Queue::notify()
{
- //notify listeners that there may be messages to process
- for_each(listeners.begin(), listeners.end(), mem_fun(&Consumer::notify));
+ if (listeners.empty()) return;
+
+ Listeners copy(listeners);
listeners.clear();
+
+ sys::ScopedLock<Guard> g(notifierLock);//prevent consumers being deleted while held in copy
+ {
+ Mutex::ScopedUnlock u(messageLock);
+ for_each(copy.begin(), copy.end(), mem_fun(&Consumer::notify));
+ }
}
void Queue::removeListener(Consumer& c)
{
Mutex::ScopedLock locker(messageLock);
- listeners.erase(&c);
+ notifierLock.wait(messageLock);//wait until no notifies are in progress
+ Listeners::iterator i = find(listeners.begin(), listeners.end(), &c);
+ if (i != listeners.end()) listeners.erase(i);
}
void Queue::addListener(Consumer& c)
{
- listeners.insert(&c);
+ Listeners::iterator i = find(listeners.begin(), listeners.end(), &c);
+ if (i == listeners.end()) listeners.push_back(&c);
}
bool Queue::dispatch(Consumer& c)
@@ -682,6 +675,27 @@
}
}
+/*
+ * Use of Guard requires an external lock to be held before calling
+ * any of its methods
+ */
+Queue::Guard::Guard() : count(0) {}
+
+void Queue::Guard::lock()
+{
+ count++;
+}
+
+void Queue::Guard::unlock()
+{
+ if (--count == 0) condition.notifyAll();
+}
+
+void Queue::Guard::wait(sys::Mutex& m)
+{
+ while (count) condition.wait(m);
+}
+
ManagementObject::shared_ptr Queue::GetManagementObject (void) const
{
return dynamic_pointer_cast<ManagementObject> (mgmtObject);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=674848&r1=674847&r2=674848&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Tue Jul 8 07:53:51 2008
@@ -38,7 +38,6 @@
#include <vector>
#include <memory>
#include <deque>
-#include <set>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
@@ -62,9 +61,20 @@
*/
class Queue : public boost::enable_shared_from_this<Queue>,
public PersistableQueue, public management::Manageable {
- typedef std::set<Consumer*> Listeners;
+ typedef qpid::InlineVector<Consumer*, 5> Listeners;
typedef std::deque<QueuedMessage> Messages;
+ class Guard
+ {
+ qpid::sys::Condition condition;
+ size_t count;
+ public:
+ Guard();
+ void lock();
+ void unlock();
+ void wait(sys::Mutex&);
+ };
+
const string name;
const bool autodelete;
MessageStore* store;
@@ -79,6 +89,7 @@
mutable qpid::sys::Mutex consumerLock;
mutable qpid::sys::Mutex messageLock;
mutable qpid::sys::Mutex ownershipLock;
+ Guard notifierLock;
mutable uint64_t persistenceId;
framing::FieldTable settings;
std::auto_ptr<QueuePolicy> policy;
@@ -95,7 +106,6 @@
bool getNextMessage(QueuedMessage& msg, Consumer& c);
bool consumeNextMessage(QueuedMessage& msg, Consumer& c);
bool browseNextMessage(QueuedMessage& msg, Consumer& c);
- bool canExcludeUnwanted();
void notify();
void removeListener(Consumer&);