You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/10/13 19:09:06 UTC
svn commit: r704166 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/broker/
tests/
Author: gsim
Date: Mon Oct 13 10:09:06 2008
New Revision: 704166
URL: http://svn.apache.org/viewvc?rev=704166&view=rev
Log:
Periodically purge expired messages from queues
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.h (with props)
Modified:
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=704166&r1=704165&r2=704166&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Mon Oct 13 10:09:06 2008
@@ -300,6 +300,7 @@
qpid/broker/BrokerSingleton.cpp \
qpid/broker/Exchange.cpp \
qpid/broker/Queue.cpp \
+ qpid/broker/QueueCleaner.cpp \
qpid/broker/PersistableMessage.cpp \
qpid/broker/Bridge.cpp \
qpid/broker/Connection.cpp \
@@ -430,6 +431,7 @@
qpid/broker/SessionAdapter.h \
qpid/broker/Exchange.h \
qpid/broker/Queue.h \
+ qpid/broker/QueueCleaner.h \
qpid/broker/BrokerSingleton.h \
qpid/broker/Bridge.h \
qpid/broker/Connection.h \
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=704166&r1=704165&r2=704166&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Mon Oct 13 10:09:06 2008
@@ -87,6 +87,7 @@
stagingThreshold(5000000),
enableMgmt(1),
mgmtPubInterval(10),
+ queueCleanInterval(60*10),//10 minutes
auth(AUTH_DEFAULT),
realm("QPID"),
replayFlushLimit(0),
@@ -114,6 +115,8 @@
("staging-threshold", optValue(stagingThreshold, "N"), "Stages messages over N bytes to disk")
("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management")
("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), "Management Publish Interval")
+ ("queue-purge-interval", optValue(queueCleanInterval, "SECONDS"),
+ "Interval between attempts to purge any expired messages from queues")
("auth", optValue(auth, "yes|no"), "Enable authentication, if disabled all incoming connections will be trusted")
("realm", optValue(realm, "REALM"), "Use the given realm when performing authentication")
("default-queue-limit", optValue(queueLimit, "BYTES"), "Default maximum size for queues (in bytes)")
@@ -142,7 +145,8 @@
conf.replayFlushLimit*1024, // convert kb to bytes.
conf.replayHardLimit*1024),
*this),
- getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
+ queueCleaner(queues, timer),
+ getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
{
if(conf.enableMgmt){
QPID_LOG(info, "Management enabled");
@@ -244,6 +248,10 @@
i != plugins.end();
i++)
(*i)->initialize(*this);
+
+ if (conf.queueCleanInterval) {
+ queueCleaner.start(conf.queueCleanInterval * qpid::sys::TIME_SEC);
+ }
}
void Broker::declareStandardExchange(const std::string& name, const std::string& type)
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=704166&r1=704165&r2=704166&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Mon Oct 13 10:09:06 2008
@@ -31,6 +31,7 @@
#include "QueueRegistry.h"
#include "LinkRegistry.h"
#include "SessionManager.h"
+#include "QueueCleaner.h"
#include "Vhost.h"
#include "System.h"
#include "Timer.h"
@@ -90,6 +91,7 @@
uint64_t stagingThreshold;
bool enableMgmt;
uint16_t mgmtPubInterval;
+ uint16_t queueCleanInterval;
bool auth;
std::string realm;
size_t replayFlushLimit;
@@ -120,6 +122,7 @@
qmf::org::apache::qpid::broker::Broker* mgmtObject;
Vhost::shared_ptr vhostObject;
System::shared_ptr systemObject;
+ QueueCleaner queueCleaner;
void declareStandardExchange(const std::string& name, const std::string& type);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=704166&r1=704165&r2=704166&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Mon Oct 13 10:09:06 2008
@@ -403,6 +403,19 @@
return msg;
}
+void Queue::purgeExpired()
+{
+ Mutex::ScopedLock locker(messageLock);
+ for (Messages::iterator i = messages.begin(); i != messages.end(); ) {
+ if (i->payload->hasExpired()) {
+ dequeue(0, *i);
+ i = messages.erase(i);
+ } else {
+ ++i;
+ }
+ }
+}
+
/**
* purge - for purging all or some messages on a queue
* depending on the purge_request
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=704166&r1=704165&r2=704166&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Mon Oct 13 10:09:06 2008
@@ -189,6 +189,7 @@
void cancel(Consumer::shared_ptr c);
uint32_t purge(const uint32_t purge_request = 0); //defaults to all messages
+ void purgeExpired();
//move qty # of messages to destination Queue destq
uint32_t move(const Queue::shared_ptr destq, uint32_t qty);
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp?rev=704166&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp Mon Oct 13 10:09:06 2008
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "QueueCleaner.h"
+
+#include "Broker.h"
+#include <boost/bind.hpp>
+
+namespace qpid {
+namespace broker {
+
+QueueCleaner::QueueCleaner(QueueRegistry& q, Timer& t) : queues(q), timer(t) {}
+
+void QueueCleaner::start(qpid::sys::Duration p)
+{
+ task = boost::intrusive_ptr<TimerTask>(new Task(*this, p));
+ timer.add(task);
+}
+
+QueueCleaner::Task::Task(QueueCleaner& p, qpid::sys::Duration d) : TimerTask(d), parent(p) {}
+
+void QueueCleaner::Task::fire()
+{
+ parent.fired();
+}
+
+void QueueCleaner::fired()
+{
+ queues.eachQueue(boost::bind(&Queue::purgeExpired, _1));
+ task->reset();
+ timer.add(task);
+}
+
+
+}} // namespace qpid::broker
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.h?rev=704166&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.h Mon Oct 13 10:09:06 2008
@@ -0,0 +1,57 @@
+#ifndef QPID_BROKER_QUEUECLEANER_H
+#define QPID_BROKER_QUEUECLEANER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Timer.h"
+
+namespace qpid {
+namespace broker {
+
+class QueueRegistry;
+/**
+ * TimerTask to purge expired messages from queues
+ */
+class QueueCleaner
+{
+ public:
+ QueueCleaner(QueueRegistry& queues, Timer& timer);
+ void start(qpid::sys::Duration period);
+ private:
+ class Task : public TimerTask
+ {
+ public:
+ Task(QueueCleaner& parent, qpid::sys::Duration duration);
+ void fire();
+ private:
+ QueueCleaner& parent;
+ };
+
+ boost::intrusive_ptr<TimerTask> task;
+ QueueRegistry& queues;
+ Timer& timer;
+
+ void fired();
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_QUEUECLEANER_H*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp?rev=704166&r1=704165&r2=704166&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp Mon Oct 13 10:09:06 2008
@@ -42,6 +42,7 @@
using qpid::sys::Monitor;
using qpid::sys::Thread;
using qpid::sys::TIME_SEC;
+using qpid::broker::Broker;
using std::string;
using std::cout;
using std::endl;
@@ -94,6 +95,8 @@
struct ClientSessionFixture : public ProxySessionFixture
{
+ ClientSessionFixture(Broker::Options opts = Broker::Options()) : ProxySessionFixture(opts) {}
+
void declareSubscribe(const string& q="my-queue",
const string& dest="my-dest")
{
@@ -282,6 +285,43 @@
BOOST_CHECK(!c.isOpen());
}
+QPID_AUTO_TEST_CASE(testPeriodicExpiration) {
+ Broker::Options opts;
+ opts.queueCleanInterval = 1;
+ ClientSessionFixture fix(opts);
+ fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true);
+
+ for (uint i = 0; i < 10; i++) {
+ Message m((boost::format("Message_%1%") % (i+1)).str(), "my-queue");
+ if (i % 2) m.getDeliveryProperties().setTtl(500);
+ fix.session.messageTransfer(arg::content=m);
+ }
+
+ BOOST_CHECK_EQUAL(fix.session.queueQuery(string("my-queue")).getMessageCount(), 10u);
+ sleep(2);
+ BOOST_CHECK_EQUAL(fix.session.queueQuery(string("my-queue")).getMessageCount(), 5u);
+}
+
+QPID_AUTO_TEST_CASE(testExpirationOnPop) {
+ ClientSessionFixture fix;
+ fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true);
+
+ for (uint i = 0; i < 10; i++) {
+ Message m((boost::format("Message_%1%") % (i+1)).str(), "my-queue");
+ if (i % 2) m.getDeliveryProperties().setTtl(200);
+ fix.session.messageTransfer(arg::content=m);
+ }
+
+ ::usleep(300* 1000);
+
+ for (uint i = 0; i < 10; i++) {
+ if (i % 2) continue;
+ Message m;
+ BOOST_CHECK(fix.subs.get(m, "my-queue", TIME_SEC));
+ BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), m.getData());
+ }
+}
+
QPID_AUTO_TEST_SUITE_END()
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=704166&r1=704165&r2=704166&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Mon Oct 13 10:09:06 2008
@@ -20,6 +20,7 @@
*/
#include "unit_test.h"
#include "qpid/Exception.h"
+#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/Deliverable.h"
#include "qpid/broker/ExchangeRegistry.h"
@@ -460,6 +461,44 @@
}
+void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTtl = 0)
+{
+ for (uint i = 0; i < count; i++) {
+ intrusive_ptr<Message> m = message("exchange", "key");
+ if (i % 2) {
+ if (oddTtl) m->getProperties<DeliveryProperties>()->setTtl(oddTtl);
+ } else {
+ if (evenTtl) m->getProperties<DeliveryProperties>()->setTtl(evenTtl);
+ }
+ m->setTimestamp();
+ queue.deliver(m);
+ }
+}
+
+QPID_AUTO_TEST_CASE(testPurgeExpired) {
+ Queue queue("my-queue");
+ addMessagesToQueue(10, queue);
+ BOOST_CHECK_EQUAL(queue.getMessageCount(), 10u);
+ ::usleep(300*1000);
+ queue.purgeExpired();
+ BOOST_CHECK_EQUAL(queue.getMessageCount(), 5u);
+}
+
+QPID_AUTO_TEST_CASE(testQueueCleaner) {
+ Timer timer;
+ QueueRegistry queues;
+ Queue::shared_ptr queue = queues.declare("my-queue").first;
+ addMessagesToQueue(10, *queue, 200, 400);
+ BOOST_CHECK_EQUAL(queue->getMessageCount(), 10u);
+
+ QueueCleaner cleaner(queues, timer);
+ cleaner.start(100 * qpid::sys::TIME_MSEC);
+ ::usleep(300*1000);
+ BOOST_CHECK_EQUAL(queue->getMessageCount(), 5u);
+ ::usleep(300*1000);
+ BOOST_CHECK_EQUAL(queue->getMessageCount(), 0u);
+}
+
QPID_AUTO_TEST_SUITE_END()