You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2010/08/11 12:06:24 UTC
svn commit: r984357 - in /qpid/trunk/qpid/cpp/src: qpid/broker/Broker.cpp
qpid/broker/Queue.cpp qpid/broker/QueueCleaner.cpp
qpid/broker/QueueCleaner.h tests/QueueTest.cpp
Author: gsim
Date: Wed Aug 11 10:06:24 2010
New Revision: 984357
URL: http://svn.apache.org/viewvc?rev=984357&view=rev
Log:
Revert commits r981517 and r981435 that moved periodic purging of queues onto cluster's timer. If the timer fires during an update it causes errors; it also puts a potentially time consuming task on the clusters dispatch thread. Instead don't purge LVQs to avoid cluster inconsistencies (and more directly the assertion that aims to prevent these).
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.h
qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=984357&r1=984356&r2=984357&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Wed Aug 11 10:06:24 2010
@@ -156,7 +156,7 @@ Broker::Broker(const Broker::Options& co
conf.replayFlushLimit*1024, // convert kb to bytes.
conf.replayHardLimit*1024),
*this),
- queueCleaner(queues, &timer),
+ queueCleaner(queues, timer),
queueEvents(poller,!conf.asyncQueueEvents),
recovery(true),
clusterUpdatee(false),
@@ -504,7 +504,6 @@ bool Broker::deferDeliveryImpl(const std
void Broker::setClusterTimer(std::auto_ptr<sys::Timer> t) {
clusterTimer = t;
- queueCleaner.setTimer(clusterTimer.get());
}
const std::string Broker::TCP_TRANSPORT("tcp");
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=984357&r1=984356&r2=984357&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Wed Aug 11 10:06:24 2010
@@ -494,16 +494,34 @@ void Queue::purgeExpired()
{
//As expired messages are discarded during dequeue also, only
//bother explicitly expiring if the rate of dequeues since last
- //attempt is less than one per second.
- if (dequeueTracker.sampleRatePerSecond() < 1) {
+ //attempt is less than one per second.
+
+ //Note: This method is currently called periodically on the timer
+ //thread. In a clustered broker this means that the purging does
+ //not occur on the cluster event dispatch thread and consequently
+ //that is not totally ordered w.r.t other events (including
+ //publication of messages). However the cluster does ensure that
+ //the actual expiration of messages (as distinct from the removing
+ //of those expired messages from the queue) *is* consistently
+ //ordered w.r.t. cluster events. This means that delivery of
+ //messages is in general consistent across the cluster inspite of
+ //any non-determinism in the triggering of a purge. However at
+ //present purging a last value queue could potentially cause
+ //inconsistencies in the cluster (as the order w.r.t publications
+ //can affect the order in which messages appear in the
+ //queue). Consequently periodic purging of an LVQ is not enabled
+ //(expired messages will be removed on delivery and consolidated
+ //by key as part of normal LVQ operation).
+
+ if (dequeueTracker.sampleRatePerSecond() < 1 && !lastValueQueue) {
Messages expired;
{
Mutex::ScopedLock locker(messageLock);
for (Messages::iterator i = messages.begin(); i != messages.end();) {
- if (lastValueQueue) checkLvqReplace(*i);
+ //Re-introduce management of LVQ-specific state here
+ //if purging is renabled for that case (see note above)
if (i->payload->hasExpired()) {
expired.push_back(*i);
- clearLVQIndex(*i);
i = messages.erase(i);
} else {
++i;
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp?rev=984357&r1=984356&r2=984357&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp Wed Aug 11 10:06:24 2010
@@ -26,27 +26,20 @@
namespace qpid {
namespace broker {
-QueueCleaner::QueueCleaner(QueueRegistry& q, sys::Timer* t) : queues(q), timer(t) {}
+QueueCleaner::QueueCleaner(QueueRegistry& q, sys::Timer& t) : queues(q), timer(t) {}
QueueCleaner::~QueueCleaner()
{
if (task) task->cancel();
}
-void QueueCleaner::setTimer(sys::Timer* t)
-{
- timer = t;
-}
-
void QueueCleaner::start(qpid::sys::Duration p)
{
- if (timer) {
- task = new Task(*this, p);
- timer->add(task);
- }
+ task = new Task(*this, p);
+ timer.add(task);
}
-QueueCleaner::Task::Task(QueueCleaner& p, qpid::sys::Duration d) : sys::TimerTask(d, "QueueCleaner::fired"), parent(p) {}
+QueueCleaner::Task::Task(QueueCleaner& p, qpid::sys::Duration d) : sys::TimerTask(d), parent(p) {}
void QueueCleaner::Task::fire()
{
@@ -73,7 +66,7 @@ void QueueCleaner::fired()
queues.eachQueue(collect);
std::for_each(copy.begin(), copy.end(), boost::bind(&Queue::purgeExpired, _1));
task->setupNextFire();
- if (timer) timer->add(task);
+ timer.add(task);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.h?rev=984357&r1=984356&r2=984357&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.h Wed Aug 11 10:06:24 2010
@@ -35,9 +35,8 @@ class QueueRegistry;
class QueueCleaner
{
public:
- QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, sys::Timer* timer);
+ QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, sys::Timer& timer);
QPID_BROKER_EXTERN ~QueueCleaner();
- QPID_BROKER_EXTERN void setTimer(sys::Timer* timer);
QPID_BROKER_EXTERN void start(qpid::sys::Duration period);
private:
class Task : public sys::TimerTask
@@ -51,7 +50,7 @@ class QueueCleaner
boost::intrusive_ptr<sys::TimerTask> task;
QueueRegistry& queues;
- sys::Timer* timer;
+ sys::Timer& timer;
void fired();
};
Modified: qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=984357&r1=984356&r2=984357&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Wed Aug 11 10:06:24 2010
@@ -687,7 +687,7 @@ QPID_AUTO_TEST_CASE(testQueueCleaner) {
addMessagesToQueue(10, *queue, 200, 400);
BOOST_CHECK_EQUAL(queue->getMessageCount(), 10u);
- QueueCleaner cleaner(queues, &timer);
+ QueueCleaner cleaner(queues, timer);
cleaner.start(100 * qpid::sys::TIME_MSEC);
::usleep(300*1000);
BOOST_CHECK_EQUAL(queue->getMessageCount(), 5u);
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org