You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/10/13 19:09:06 UTC

svn commit: r704166 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/broker/ tests/

Author: gsim
Date: Mon Oct 13 10:09:06 2008
New Revision: 704166

URL: http://svn.apache.org/viewvc?rev=704166&view=rev
Log:
Periodically purge expired messages from queues


Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.h   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=704166&r1=704165&r2=704166&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Mon Oct 13 10:09:06 2008
@@ -300,6 +300,7 @@
   qpid/broker/BrokerSingleton.cpp \
   qpid/broker/Exchange.cpp \
   qpid/broker/Queue.cpp \
+  qpid/broker/QueueCleaner.cpp \
   qpid/broker/PersistableMessage.cpp \
   qpid/broker/Bridge.cpp \
   qpid/broker/Connection.cpp \
@@ -430,6 +431,7 @@
   qpid/broker/SessionAdapter.h \
   qpid/broker/Exchange.h \
   qpid/broker/Queue.h \
+  qpid/broker/QueueCleaner.h \
   qpid/broker/BrokerSingleton.h \
   qpid/broker/Bridge.h \
   qpid/broker/Connection.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=704166&r1=704165&r2=704166&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Mon Oct 13 10:09:06 2008
@@ -87,6 +87,7 @@
     stagingThreshold(5000000),
     enableMgmt(1),
     mgmtPubInterval(10),
+    queueCleanInterval(60*10),//10 minutes
     auth(AUTH_DEFAULT),
     realm("QPID"),
     replayFlushLimit(0),
@@ -114,6 +115,8 @@
         ("staging-threshold", optValue(stagingThreshold, "N"), "Stages messages over N bytes to disk")
         ("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management")
         ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), "Management Publish Interval")
+        ("queue-purge-interval", optValue(queueCleanInterval, "SECONDS"), 
+         "Interval between attempts to purge any expired messages from queues")
         ("auth", optValue(auth, "yes|no"), "Enable authentication, if disabled all incoming connections will be trusted")
         ("realm", optValue(realm, "REALM"), "Use the given realm when performing authentication")
         ("default-queue-limit", optValue(queueLimit, "BYTES"), "Default maximum size for queues (in bytes)")
@@ -142,7 +145,8 @@
             conf.replayFlushLimit*1024, // convert kb to bytes.
             conf.replayHardLimit*1024),
         *this),
-   getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
+    queueCleaner(queues, timer),
+    getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
 {
     if(conf.enableMgmt){
         QPID_LOG(info, "Management enabled");
@@ -244,6 +248,10 @@
          i != plugins.end();
          i++)
         (*i)->initialize(*this);
+
+    if (conf.queueCleanInterval) {
+        queueCleaner.start(conf.queueCleanInterval * qpid::sys::TIME_SEC);
+    }
 }
 
 void Broker::declareStandardExchange(const std::string& name, const std::string& type)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=704166&r1=704165&r2=704166&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Mon Oct 13 10:09:06 2008
@@ -31,6 +31,7 @@
 #include "QueueRegistry.h"
 #include "LinkRegistry.h"
 #include "SessionManager.h"
+#include "QueueCleaner.h"
 #include "Vhost.h"
 #include "System.h"
 #include "Timer.h"
@@ -90,6 +91,7 @@
         uint64_t stagingThreshold;
         bool enableMgmt;
         uint16_t mgmtPubInterval;
+        uint16_t queueCleanInterval;
         bool auth;
         std::string realm;
         size_t replayFlushLimit;
@@ -120,6 +122,7 @@
     qmf::org::apache::qpid::broker::Broker* mgmtObject;
     Vhost::shared_ptr            vhostObject;
     System::shared_ptr           systemObject;
+    QueueCleaner queueCleaner;
 
     void declareStandardExchange(const std::string& name, const std::string& type);
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=704166&r1=704165&r2=704166&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Mon Oct 13 10:09:06 2008
@@ -403,6 +403,19 @@
     return msg;
 }
 
+void Queue::purgeExpired()
+{
+    Mutex::ScopedLock locker(messageLock);
+    for (Messages::iterator i = messages.begin(); i != messages.end(); ) {
+        if (i->payload->hasExpired()) {
+            dequeue(0, *i);
+            i = messages.erase(i);
+        } else {
+            ++i;
+        }
+    } 
+}
+
 /**
  * purge - for purging all or some messages on a queue
  *         depending on the purge_request

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=704166&r1=704165&r2=704166&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Mon Oct 13 10:09:06 2008
@@ -189,6 +189,7 @@
             void cancel(Consumer::shared_ptr c);
 
             uint32_t purge(const uint32_t purge_request = 0); //defaults to all messages 
+            void purgeExpired();
 
 	    //move qty # of messages to destination Queue destq
 	    uint32_t move(const Queue::shared_ptr destq, uint32_t qty); 

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp?rev=704166&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp Mon Oct 13 10:09:06 2008
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "QueueCleaner.h"
+
+#include "Broker.h"
+#include <boost/bind.hpp>
+
+namespace qpid {
+namespace broker {
+
+QueueCleaner::QueueCleaner(QueueRegistry& q, Timer& t) : queues(q), timer(t) {}
+
+void QueueCleaner::start(qpid::sys::Duration p)
+{
+    task = boost::intrusive_ptr<TimerTask>(new Task(*this, p));
+    timer.add(task);
+}
+
+QueueCleaner::Task::Task(QueueCleaner& p, qpid::sys::Duration d) : TimerTask(d), parent(p) {}
+
+void QueueCleaner::Task::fire()
+{
+    parent.fired();
+}
+
+void QueueCleaner::fired()
+{
+    queues.eachQueue(boost::bind(&Queue::purgeExpired, _1));
+    task->reset();
+    timer.add(task);
+}
+
+
+}} // namespace qpid::broker

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.h?rev=704166&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.h Mon Oct 13 10:09:06 2008
@@ -0,0 +1,57 @@
+#ifndef QPID_BROKER_QUEUECLEANER_H
+#define QPID_BROKER_QUEUECLEANER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Timer.h"
+
+namespace qpid {
+namespace broker {
+
+class QueueRegistry;
+/**
+ * TimerTask to purge expired messages from queues
+ */
+class QueueCleaner
+{
+  public:
+    QueueCleaner(QueueRegistry& queues, Timer& timer);
+    void start(qpid::sys::Duration period);
+  private:
+    class Task : public TimerTask
+    {
+      public:
+        Task(QueueCleaner& parent, qpid::sys::Duration duration);
+        void fire();
+      private:
+        QueueCleaner& parent;
+    };
+    
+    boost::intrusive_ptr<TimerTask> task;
+    QueueRegistry& queues;
+    Timer& timer;
+
+    void fired();
+};
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_QUEUECLEANER_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp?rev=704166&r1=704165&r2=704166&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp Mon Oct 13 10:09:06 2008
@@ -42,6 +42,7 @@
 using qpid::sys::Monitor;
 using qpid::sys::Thread;
 using qpid::sys::TIME_SEC;
+using qpid::broker::Broker;
 using std::string;
 using std::cout;
 using std::endl;
@@ -94,6 +95,8 @@
 
 struct ClientSessionFixture : public ProxySessionFixture
 {
+    ClientSessionFixture(Broker::Options opts = Broker::Options()) : ProxySessionFixture(opts) {}
+
     void declareSubscribe(const string& q="my-queue",
                           const string& dest="my-dest")
     {
@@ -282,6 +285,43 @@
     BOOST_CHECK(!c.isOpen());
 }
 
+QPID_AUTO_TEST_CASE(testPeriodicExpiration) {
+    Broker::Options opts;
+    opts.queueCleanInterval = 1;
+    ClientSessionFixture fix(opts);
+    fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true);
+
+    for (uint i = 0; i < 10; i++) {        
+        Message m((boost::format("Message_%1%") % (i+1)).str(), "my-queue");        
+        if (i % 2) m.getDeliveryProperties().setTtl(500);
+        fix.session.messageTransfer(arg::content=m);
+    }
+
+    BOOST_CHECK_EQUAL(fix.session.queueQuery(string("my-queue")).getMessageCount(), 10u);
+    sleep(2);
+    BOOST_CHECK_EQUAL(fix.session.queueQuery(string("my-queue")).getMessageCount(), 5u);
+}
+
+QPID_AUTO_TEST_CASE(testExpirationOnPop) {
+    ClientSessionFixture fix;
+    fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true);
+
+    for (uint i = 0; i < 10; i++) {        
+        Message m((boost::format("Message_%1%") % (i+1)).str(), "my-queue");        
+        if (i % 2) m.getDeliveryProperties().setTtl(200);
+        fix.session.messageTransfer(arg::content=m);
+    }
+
+    ::usleep(300* 1000);
+
+    for (uint i = 0; i < 10; i++) {        
+        if (i % 2) continue;
+        Message m;
+        BOOST_CHECK(fix.subs.get(m, "my-queue", TIME_SEC));
+        BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), m.getData());
+    }
+}
+
 QPID_AUTO_TEST_SUITE_END()
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=704166&r1=704165&r2=704166&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Mon Oct 13 10:09:06 2008
@@ -20,6 +20,7 @@
  */
 #include "unit_test.h"
 #include "qpid/Exception.h"
+#include "qpid/broker/Broker.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/Deliverable.h"
 #include "qpid/broker/ExchangeRegistry.h"
@@ -460,6 +461,44 @@
 
 }
 
+void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTtl = 0) 
+{
+    for (uint i = 0; i < count; i++) {
+        intrusive_ptr<Message> m = message("exchange", "key");
+        if (i % 2) {
+            if (oddTtl) m->getProperties<DeliveryProperties>()->setTtl(oddTtl);
+        } else {
+            if (evenTtl) m->getProperties<DeliveryProperties>()->setTtl(evenTtl);
+        }
+        m->setTimestamp();
+        queue.deliver(m);
+    }
+}
+
+QPID_AUTO_TEST_CASE(testPurgeExpired) {
+    Queue queue("my-queue");
+    addMessagesToQueue(10, queue);
+    BOOST_CHECK_EQUAL(queue.getMessageCount(), 10u);
+    ::usleep(300*1000);
+    queue.purgeExpired();
+    BOOST_CHECK_EQUAL(queue.getMessageCount(), 5u);
+}
+
+QPID_AUTO_TEST_CASE(testQueueCleaner) {
+    Timer timer;
+    QueueRegistry queues;
+    Queue::shared_ptr queue = queues.declare("my-queue").first;
+    addMessagesToQueue(10, *queue, 200, 400);
+    BOOST_CHECK_EQUAL(queue->getMessageCount(), 10u);
+
+    QueueCleaner cleaner(queues, timer);
+    cleaner.start(100 * qpid::sys::TIME_MSEC);
+    ::usleep(300*1000);
+    BOOST_CHECK_EQUAL(queue->getMessageCount(), 5u);
+    ::usleep(300*1000);
+    BOOST_CHECK_EQUAL(queue->getMessageCount(), 0u);
+}
+
 QPID_AUTO_TEST_SUITE_END()