You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2010/08/11 12:06:24 UTC

svn commit: r984357 - in /qpid/trunk/qpid/cpp/src: qpid/broker/Broker.cpp qpid/broker/Queue.cpp qpid/broker/QueueCleaner.cpp qpid/broker/QueueCleaner.h tests/QueueTest.cpp

Author: gsim
Date: Wed Aug 11 10:06:24 2010
New Revision: 984357

URL: http://svn.apache.org/viewvc?rev=984357&view=rev
Log:
Revert commits r981517 and r981435 that moved periodic purging of queues onto cluster's timer. If the timer fires during an update it causes errors; it also puts a potentially time consuming task on the clusters dispatch thread. Instead don't purge LVQs to avoid cluster inconsistencies (and more directly the assertion that aims to prevent these).

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.h
    qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=984357&r1=984356&r2=984357&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Wed Aug 11 10:06:24 2010
@@ -156,7 +156,7 @@ Broker::Broker(const Broker::Options& co
             conf.replayFlushLimit*1024, // convert kb to bytes.
             conf.replayHardLimit*1024),
         *this),
-    queueCleaner(queues, &timer),
+    queueCleaner(queues, timer),
     queueEvents(poller,!conf.asyncQueueEvents), 
     recovery(true),
     clusterUpdatee(false),
@@ -504,7 +504,6 @@ bool Broker::deferDeliveryImpl(const std
 
 void Broker::setClusterTimer(std::auto_ptr<sys::Timer> t) {
     clusterTimer = t;
-    queueCleaner.setTimer(clusterTimer.get());
 }
 
 const std::string Broker::TCP_TRANSPORT("tcp");

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=984357&r1=984356&r2=984357&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Wed Aug 11 10:06:24 2010
@@ -494,16 +494,34 @@ void Queue::purgeExpired()
 {
     //As expired messages are discarded during dequeue also, only
     //bother explicitly expiring if the rate of dequeues since last
-    //attempt is less than one per second.
-    if (dequeueTracker.sampleRatePerSecond() < 1) {
+    //attempt is less than one per second.  
+
+    //Note: This method is currently called periodically on the timer
+    //thread. In a clustered broker this means that the purging does
+    //not occur on the cluster event dispatch thread and consequently
+    //that is not totally ordered w.r.t other events (including
+    //publication of messages). However the cluster does ensure that
+    //the actual expiration of messages (as distinct from the removing
+    //of those expired messages from the queue) *is* consistently
+    //ordered w.r.t. cluster events. This means that delivery of
+    //messages is in general consistent across the cluster inspite of
+    //any non-determinism in the triggering of a purge. However at
+    //present purging a last value queue could potentially cause
+    //inconsistencies in the cluster (as the order w.r.t publications
+    //can affect the order in which messages appear in the
+    //queue). Consequently periodic purging of an LVQ is not enabled
+    //(expired messages will be removed on delivery and consolidated
+    //by key as part of normal LVQ operation).
+
+    if (dequeueTracker.sampleRatePerSecond() < 1 && !lastValueQueue) {
         Messages expired;
         {
             Mutex::ScopedLock locker(messageLock);
             for (Messages::iterator i = messages.begin(); i != messages.end();) {
-                if (lastValueQueue) checkLvqReplace(*i);
+                //Re-introduce management of LVQ-specific state here
+                //if purging is renabled for that case (see note above)
                 if (i->payload->hasExpired()) {
                     expired.push_back(*i);
-                    clearLVQIndex(*i);
                     i = messages.erase(i);
                 } else {
                     ++i;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp?rev=984357&r1=984356&r2=984357&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp Wed Aug 11 10:06:24 2010
@@ -26,27 +26,20 @@
 namespace qpid {
 namespace broker {
 
-QueueCleaner::QueueCleaner(QueueRegistry& q, sys::Timer* t) : queues(q), timer(t) {}
+QueueCleaner::QueueCleaner(QueueRegistry& q, sys::Timer& t) : queues(q), timer(t) {}
 
 QueueCleaner::~QueueCleaner()
 {
     if (task) task->cancel();
 }
 
-void QueueCleaner::setTimer(sys::Timer* t)
-{
-    timer = t;
-}
-
 void QueueCleaner::start(qpid::sys::Duration p)
 {
-    if (timer) {
-        task = new Task(*this, p);
-        timer->add(task);
-    }
+    task = new Task(*this, p);
+    timer.add(task);
 }
 
-QueueCleaner::Task::Task(QueueCleaner& p, qpid::sys::Duration d) : sys::TimerTask(d, "QueueCleaner::fired"), parent(p) {}
+QueueCleaner::Task::Task(QueueCleaner& p, qpid::sys::Duration d) : sys::TimerTask(d), parent(p) {}
 
 void QueueCleaner::Task::fire()
 {
@@ -73,7 +66,7 @@ void QueueCleaner::fired()
     queues.eachQueue(collect);
     std::for_each(copy.begin(), copy.end(), boost::bind(&Queue::purgeExpired, _1));
     task->setupNextFire();
-    if (timer) timer->add(task);
+    timer.add(task);
 }
 
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.h?rev=984357&r1=984356&r2=984357&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.h Wed Aug 11 10:06:24 2010
@@ -35,9 +35,8 @@ class QueueRegistry;
 class QueueCleaner
 {
   public:
-    QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, sys::Timer* timer);
+    QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, sys::Timer& timer);
     QPID_BROKER_EXTERN ~QueueCleaner();
-    QPID_BROKER_EXTERN void setTimer(sys::Timer* timer);
     QPID_BROKER_EXTERN void start(qpid::sys::Duration period);
   private:
     class Task : public sys::TimerTask
@@ -51,7 +50,7 @@ class QueueCleaner
 
     boost::intrusive_ptr<sys::TimerTask> task;
     QueueRegistry& queues;
-    sys::Timer* timer;
+    sys::Timer& timer;
 
     void fired();
 };

Modified: qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=984357&r1=984356&r2=984357&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Wed Aug 11 10:06:24 2010
@@ -687,7 +687,7 @@ QPID_AUTO_TEST_CASE(testQueueCleaner) {
     addMessagesToQueue(10, *queue, 200, 400);
     BOOST_CHECK_EQUAL(queue->getMessageCount(), 10u);
 
-    QueueCleaner cleaner(queues, &timer);
+    QueueCleaner cleaner(queues, timer);
     cleaner.start(100 * qpid::sys::TIME_MSEC);
     ::usleep(300*1000);
     BOOST_CHECK_EQUAL(queue->getMessageCount(), 5u);



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org