You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/01/29 23:59:10 UTC
svn commit: r904656 - in /qpid/trunk/qpid/cpp: src/ src/qpid/broker/
src/qpid/cluster/ src/qpid/management/ src/qpid/sys/ xml/
Author: aconway
Date: Fri Jan 29 22:59:09 2010
New Revision: 904656
URL: http://svn.apache.org/viewvc?rev=904656&view=rev
Log:
Replace PeriodicTimer with ClusterTimer, which inherits from Timer.
Added:
qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterTimer.h
Modified:
qpid/trunk/qpid/cpp/src/CMakeLists.txt
qpid/trunk/qpid/cpp/src/Makefile.am
qpid/trunk/qpid/cpp/src/cluster.cmake
qpid/trunk/qpid/cpp/src/cluster.mk
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
qpid/trunk/qpid/cpp/src/qpid/sys/Timer.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/Timer.h
qpid/trunk/qpid/cpp/xml/cluster.xml
Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=904656&r1=904655&r2=904656&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Fri Jan 29 22:59:09 2010
@@ -573,7 +573,6 @@
qpid/sys/Runnable.cpp
qpid/sys/Shlib.cpp
qpid/sys/Timer.cpp
- qpid/sys/PeriodicTimerImpl.cpp
)
add_library (qpidcommon SHARED ${qpidcommon_SOURCES})
@@ -735,7 +734,6 @@
qpid/broker/Connection.cpp
qpid/broker/ConnectionHandler.cpp
qpid/broker/ConnectionFactory.cpp
- qpid/broker/DelegatingPeriodicTimer.cpp
qpid/broker/DeliverableMessage.cpp
qpid/broker/DeliveryRecord.cpp
qpid/broker/DirectExchange.cpp
Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=904656&r1=904655&r2=904656&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Fri Jan 29 22:59:09 2010
@@ -471,9 +471,6 @@
qpid/sys/TimeoutHandler.h \
qpid/sys/Timer.cpp \
qpid/sys/Timer.h \
- qpid/sys/PeriodicTimer.h \
- qpid/sys/PeriodicTimerImpl.h \
- qpid/sys/PeriodicTimerImpl.cpp \
qpid/sys/Waitable.h \
qpid/sys/alloca.h \
qpid/sys/uuid.h
@@ -508,8 +505,6 @@
qpid/broker/Consumer.h \
qpid/broker/Daemon.cpp \
qpid/broker/Daemon.h \
- qpid/broker/DelegatingPeriodicTimer.h \
- qpid/broker/DelegatingPeriodicTimer.cpp \
qpid/broker/Deliverable.h \
qpid/broker/DeliverableMessage.cpp \
qpid/broker/DeliverableMessage.h \
Modified: qpid/trunk/qpid/cpp/src/cluster.cmake
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/cluster.cmake?rev=904656&r1=904655&r2=904656&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/cluster.cmake (original)
+++ qpid/trunk/qpid/cpp/src/cluster.cmake Fri Jan 29 22:59:09 2010
@@ -81,6 +81,8 @@
${CMAN_SOURCES}
qpid/cluster/Cluster.cpp
qpid/cluster/Cluster.h
+ qpid/cluster/ClusterTimer.cpp
+ qpid/cluster/ClusterTimer.h
qpid/cluster/Decoder.cpp
qpid/cluster/Decoder.h
qpid/cluster/PollableQueue.h
@@ -129,8 +131,6 @@
qpid/cluster/MemberSet.h
qpid/cluster/MemberSet.cpp
qpid/cluster/types.h
- qpid/cluster/PeriodicTimerImpl.h
- qpid/cluster/PeriodicTimerImpl.cpp
qpid/cluster/StoreStatus.h
qpid/cluster/StoreStatus.cpp
)
Modified: qpid/trunk/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/cluster.mk?rev=904656&r1=904655&r2=904656&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ qpid/trunk/qpid/cpp/src/cluster.mk Fri Jan 29 22:59:09 2010
@@ -40,6 +40,8 @@
$(CMAN_SOURCES) \
qpid/cluster/Cluster.cpp \
qpid/cluster/Cluster.h \
+ qpid/cluster/ClusterTimer.cpp \
+ qpid/cluster/ClusterTimer.h \
qpid/cluster/Decoder.cpp \
qpid/cluster/Decoder.h \
qpid/cluster/PollableQueue.h \
@@ -88,8 +90,6 @@
qpid/cluster/MemberSet.h \
qpid/cluster/MemberSet.cpp \
qpid/cluster/types.h \
- qpid/cluster/PeriodicTimerImpl.h \
- qpid/cluster/PeriodicTimerImpl.cpp \
qpid/cluster/StoreStatus.h \
qpid/cluster/StoreStatus.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=904656&r1=904655&r2=904656&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Fri Jan 29 22:59:09 2010
@@ -49,7 +49,6 @@
#include "qpid/sys/ConnectionInputHandlerFactory.h"
#include "qpid/sys/TimeoutHandler.h"
#include "qpid/sys/SystemInfo.h"
-#include "qpid/sys/PeriodicTimerImpl.h"
#include "qpid/Address.h"
#include "qpid/Url.h"
#include "qpid/Version.h"
@@ -257,11 +256,7 @@
// Initialize plugins
Plugin::initializeAll(*this);
- if (!periodicTimer.hasDelegate()) {
- // If no plugin has contributed a PeriodicTimer, use the default one.
- periodicTimer.setDelegate(
- std::auto_ptr<sys::PeriodicTimer>(new sys::PeriodicTimerImpl(timer)));
- }
+ if (managementAgent.get()) managementAgent->pluginsInitialized();
if (conf.queueCleanInterval) {
queueCleaner.start(conf.queueCleanInterval * qpid::sys::TIME_SEC);
@@ -474,8 +469,8 @@
return knownBrokers;
}
-void Broker::setPeriodicTimer(std::auto_ptr<sys::PeriodicTimer> pt) {
- periodicTimer.setDelegate(pt);
+void Broker::setClusterTimer(std::auto_ptr<sys::Timer> t) {
+ clusterTimer = t;
}
const std::string Broker::TCP_TRANSPORT("tcp");
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=904656&r1=904655&r2=904656&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Fri Jan 29 22:59:09 2010
@@ -25,7 +25,6 @@
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/ConnectionFactory.h"
#include "qpid/broker/ConnectionToken.h"
-#include "qpid/broker/DelegatingPeriodicTimer.h"
#include "qpid/broker/DirectExchange.h"
#include "qpid/broker/DtxManager.h"
#include "qpid/broker/ExchangeRegistry.h"
@@ -147,7 +146,7 @@
boost::shared_ptr<sys::Poller> poller;
sys::Timer timer;
- DelegatingPeriodicTimer periodicTimer;
+ std::auto_ptr<sys::Timer> clusterTimer;
Options config;
std::auto_ptr<management::ManagementAgent> managementAgent;
ProtocolFactoryMap protocolFactories;
@@ -254,9 +253,12 @@
boost::shared_ptr<sys::ConnectionCodec::Factory> getConnectionFactory() { return factory; }
void setConnectionFactory(boost::shared_ptr<sys::ConnectionCodec::Factory> f) { factory = f; }
+ /** Timer for local tasks affecting only this broker */
sys::Timer& getTimer() { return timer; }
- sys::PeriodicTimer& getPeriodicTimer() { return periodicTimer; }
- void setPeriodicTimer(std::auto_ptr<sys::PeriodicTimer> pt);
+
+ /** Timer for tasks that must be synchronized if we are in a cluster */
+ sys::Timer& getClusterTimer() { return clusterTimer.get() ? *clusterTimer : timer; }
+ void setClusterTimer(std::auto_ptr<sys::Timer>);
boost::function<std::vector<Url> ()> getKnownBrokers;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=904656&r1=904655&r2=904656&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Jan 29 22:59:09 2010
@@ -112,7 +112,7 @@
#include "qpid/cluster/RetractClient.h"
#include "qpid/cluster/FailoverExchange.h"
#include "qpid/cluster/UpdateExchange.h"
-#include "qpid/cluster/PeriodicTimerImpl.h"
+#include "qpid/cluster/ClusterTimer.h"
#include "qpid/assert.h"
#include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h"
@@ -137,7 +137,7 @@
#include "qpid/framing/ClusterUpdateRequestBody.h"
#include "qpid/framing/ClusterConnectionAnnounceBody.h"
#include "qpid/framing/ClusterErrorCheckBody.h"
-#include "qpid/framing/ClusterPeriodicTimerBody.h"
+#include "qpid/framing/ClusterTimerWakeupBody.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Helpers.h"
#include "qpid/log/Statement.h"
@@ -179,7 +179,7 @@
* Currently use SVN revision to avoid clashes with versions from
* different branches.
*/
-const uint32_t Cluster::CLUSTER_VERSION = 903171;
+const uint32_t Cluster::CLUSTER_VERSION = 904565;
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
@@ -209,9 +209,8 @@
void errorCheck(uint8_t type, const framing::SequenceNumber& frameSeq) {
cluster.errorCheck(member, type, frameSeq, l);
}
- void periodicTimer(const std::string& name) {
- cluster.periodicTimer(member, name, l);
- }
+ void timerWakeup(const std::string& name) { cluster.timerWakeup(member, name, l); }
+ void timerDrop(const std::string& name) { cluster.timerWakeup(member, name, l); }
void shutdown(const Uuid& id) { cluster.shutdown(member, id, l); }
@@ -245,6 +244,7 @@
state(INIT),
initMap(self, settings.size),
store(broker.getDataDir().getPath()),
+ elder(false),
lastSize(0),
lastBroker(false),
updateRetracted(false),
@@ -252,8 +252,8 @@
{
// We give ownership of the timer to the broker and keep a plain pointer.
// This is OK as it means the timer has the same lifetime as the broker.
- timer = new PeriodicTimerImpl(*this);
- broker.setPeriodicTimer(std::auto_ptr<sys::PeriodicTimer>(timer));
+ timer = new ClusterTimer(*this);
+ broker.setClusterTimer(std::auto_ptr<sys::Timer>(timer));
mAgent = broker.getManagementAgent();
if (mAgent != 0){
@@ -577,14 +577,13 @@
initMap.checkConsistent();
elders = initMap.getElders();
QPID_LOG(debug, *this << " elders: " << elders);
- if (!elders.empty()) { // I'm not the elder, I don't handle links & replication.
+ if (elders.empty())
+ becomeElder();
+ else {
broker.getLinks().setPassive(true);
broker.getQueueEvents().disable();
QPID_LOG(info, *this << " not active for links.");
}
- else {
- QPID_LOG(info, this << " active for links.");
- }
setClusterId(initMap.getClusterId(), l);
if (store.hasStore()) store.dirty(clusterId);
@@ -636,14 +635,19 @@
if (state >= CATCHUP && memberChange) {
memberUpdate(l);
- if (elders.empty()) {
- // We are the oldest, reactive links if necessary
- QPID_LOG(info, this << " becoming active for links.");
- broker.getLinks().setPassive(false);
- }
+ if (elders.empty()) becomeElder();
}
}
+void Cluster::becomeElder() {
+ if (elder) return; // We were already the elder.
+ // We are the oldest, reactive links if necessary
+ QPID_LOG(info, *this << " became the elder, active for links.");
+ elder = true;
+ broker.getLinks().setPassive(false);
+ timer->becomeElder();
+}
+
void Cluster::makeOffer(const MemberId& id, Lock& ) {
if (state == READY && map.isJoiner(id)) {
state = OFFER;
@@ -962,13 +966,17 @@
error.respondNone(from, type, frameSeq);
}
-void Cluster::periodicTimer(const MemberId&, const std::string& name, Lock&) {
- timer->deliver(name);
+void Cluster::timerWakeup(const MemberId& , const std::string& name, Lock&) {
+ timer->deliverWakeup(name);
+}
+
+void Cluster::timerDrop(const MemberId& , const std::string& name, Lock&) {
+ timer->deliverDrop(name);
}
bool Cluster::isElder() const {
Monitor::ScopedLock l(lock);
- return state >= CATCHUP && elders.empty();
+ return elder;
}
}} // namespace qpid::cluster
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=904656&r1=904655&r2=904656&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Fri Jan 29 22:59:09 2010
@@ -63,7 +63,7 @@
class Connection;
class EventFrame;
-class PeriodicTimerImpl;
+class ClusterTimer;
/**
* Connection to the cluster
@@ -164,7 +164,8 @@
void configChange(const MemberId&, const std::string& current, Lock& l);
void messageExpired(const MemberId&, uint64_t, Lock& l);
void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&);
- void periodicTimer(const MemberId&, const std::string& name, Lock&);
+ void timerWakeup(const MemberId&, const std::string& name, Lock&);
+ void timerDrop(const MemberId&, const std::string& name, Lock&);
void shutdown(const MemberId&, const framing::Uuid& shutdownId, Lock&);
@@ -201,6 +202,8 @@
const struct cpg_address */*joined*/, int /*nJoined*/
);
+ void becomeElder();
+
// == Called in management threads.
virtual qpid::management::ManagementObject* GetManagementObject() const;
virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
@@ -265,6 +268,7 @@
StoreStatus store;
ClusterMap map;
MemberSet elders;
+ bool elder;
size_t lastSize;
bool lastBroker;
sys::Thread updateThread;
@@ -272,7 +276,7 @@
bool updateRetracted;
ErrorCheck error;
UpdateReceiver updateReceiver;
- PeriodicTimerImpl* timer;
+ ClusterTimer* timer;
friend std::ostream& operator<<(std::ostream&, const Cluster&);
friend class ClusterDispatcher;
Added: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp?rev=904656&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp Fri Jan 29 22:59:09 2010
@@ -0,0 +1,114 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Cluster.h"
+#include "ClusterTimer.h"
+#include "qpid/log/Statement.h"
+#include "qpid/framing/ClusterTimerWakeupBody.h"
+#include "qpid/framing/ClusterTimerDropBody.h"
+
+namespace qpid {
+namespace cluster {
+
+using boost::intrusive_ptr;
+using std::max;
+using sys::Timer;
+using sys::TimerTask;
+
+
+ClusterTimer::ClusterTimer(Cluster& c) : cluster(c) {}
+
+ClusterTimer::~ClusterTimer() {}
+
+// Initialization or deliver thread.
+void ClusterTimer::add(intrusive_ptr<TimerTask> task)
+{
+ QPID_LOG(trace, "Adding cluster timer task " << task->getName());
+ Map::iterator i = map.find(task->getName());
+ if (i != map.end())
+ throw Exception(QPID_MSG("Task already exists with name " << task->getName()));
+ map[task->getName()] = task;
+ // Only the elder actually activates the task with the Timer base class.
+ if (cluster.isElder()) {
+ QPID_LOG(trace, "Elder activating cluster timer task " << task->getName());
+ Timer::add(task);
+ }
+}
+
+// Timer thread
+void ClusterTimer::fire(intrusive_ptr<TimerTask> t) {
+ // Elder mcasts wakeup on fire, task is not fired until deliverWakeup
+ if (cluster.isElder()) {
+ QPID_LOG(trace, "Sending cluster timer wakeup " << t->getName());
+ cluster.getMulticast().mcastControl(
+ framing::ClusterTimerWakeupBody(framing::ProtocolVersion(), t->getName()),
+ cluster.getId());
+ }
+ else
+ QPID_LOG(trace, "Cluster timer task fired, but not elder " << t->getName());
+}
+
+// Timer thread
+void ClusterTimer::drop(intrusive_ptr<TimerTask> t) {
+ // Elder mcasts drop, task is droped in deliverDrop
+ if (cluster.isElder()) {
+ QPID_LOG(trace, "Sending cluster timer drop " << t->getName());
+ cluster.getMulticast().mcastControl(
+ framing::ClusterTimerDropBody(framing::ProtocolVersion(), t->getName()),
+ cluster.getId());
+ }
+ else
+ QPID_LOG(trace, "Cluster timer task dropped, but not on elder " << t->getName());
+}
+
+// Deliver thread
+void ClusterTimer::deliverWakeup(const std::string& name) {
+ QPID_LOG(trace, "Cluster timer wakeup delivered for " << name);
+ Map::iterator i = map.find(name);
+ if (i == map.end())
+ throw Exception(QPID_MSG("Cluster timer wakeup non-existent task " << name));
+ else {
+ intrusive_ptr<TimerTask> t = i->second;
+ map.erase(i);
+ Timer::fire(t);
+ }
+}
+
+// Deliver thread
+void ClusterTimer::deliverDrop(const std::string& name) {
+ QPID_LOG(trace, "Cluster timer drop delivered for " << name);
+ Map::iterator i = map.find(name);
+ if (i == map.end())
+ throw Exception(QPID_MSG("Cluster timer drop non-existent task " << name));
+ else {
+ intrusive_ptr<TimerTask> t = i->second;
+ map.erase(i);
+ }
+}
+
+// Deliver thread
+void ClusterTimer::becomeElder() {
+ for (Map::iterator i = map.begin(); i != map.end(); ++i) {
+ Timer::add(i->second);
+ }
+}
+
+}}
Added: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterTimer.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterTimer.h?rev=904656&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterTimer.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterTimer.h Fri Jan 29 22:59:09 2010
@@ -0,0 +1,58 @@
+#ifndef QPID_CLUSTER_CLUSTERTIMER_H
+#define QPID_CLUSTER_CLUSTERTIMER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Timer.h"
+#include <map>
+
+namespace qpid {
+namespace cluster {
+
+class Cluster;
+
+class ClusterTimer : public sys::Timer {
+ public:
+ ClusterTimer(Cluster&);
+ ~ClusterTimer();
+
+ void add(boost::intrusive_ptr<sys::TimerTask> task);
+
+ void deliverWakeup(const std::string& name);
+ void deliverDrop(const std::string& name);
+ void becomeElder();
+
+ protected:
+ void fire(boost::intrusive_ptr<sys::TimerTask> task);
+ void drop(boost::intrusive_ptr<sys::TimerTask> task);
+
+ private:
+ typedef std::map<std::string, boost::intrusive_ptr<sys::TimerTask> > Map;
+ Cluster& cluster;
+ Map map;
+};
+
+
+}}
+
+
+#endif /*!QPID_CLUSTER_CLUSTERTIMER_H*/
Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=904656&r1=904655&r2=904656&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Fri Jan 29 22:59:09 2010
@@ -52,7 +52,8 @@
}
ManagementAgent::ManagementAgent () :
- threadPoolSize(1), interval(10), broker(0), startTime(uint64_t(Duration(now())))
+ threadPoolSize(1), interval(10), broker(0), timer(0),
+ startTime(uint64_t(Duration(now())))
{
nextObjectId = 1;
brokerBank = 1;
@@ -91,12 +92,8 @@
dataDir = _dataDir;
interval = _interval;
broker = _broker;
- timer = &_broker->getPeriodicTimer();
threadPoolSize = _threads;
ManagementObject::maxThreads = threadPoolSize;
- timer->add (boost::bind(&ManagementAgent::periodicProcessing, this),
- interval * sys::TIME_SEC,
- "ManagementAgent::periodicProcessing");
// Get from file or generate and save to file.
if (dataDir.empty())
@@ -135,6 +132,12 @@
}
}
+void ManagementAgent::pluginsInitialized() {
+ // Do this here so cluster plugin has the chance to set up the timer.
+ timer = &broker->getClusterTimer();
+ timer->add(new Periodic(*this, interval));
+}
+
void ManagementAgent::writeData ()
{
string filename (dataDir + "/.mbrokerdata");
@@ -233,6 +236,19 @@
"console.event.1.0." + event.getPackageName() + "." + event.getEventName());
}
+ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds)
+ : TimerTask (qpid::sys::Duration((_seconds ? _seconds : 1) * qpid::sys::TIME_SEC),
+ "ManagementAgent::periodicProcessing"),
+ agent(_agent) {}
+
+ManagementAgent::Periodic::~Periodic () {}
+
+void ManagementAgent::Periodic::fire ()
+{
+ agent.timer->add (new Periodic (agent, agent.interval));
+ agent.periodicProcessing ();
+}
+
void ManagementAgent::clientAdded (const std::string& routingKey)
{
if (routingKey.find("console") != 0)
Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=904656&r1=904655&r2=904656&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h Fri Jan 29 22:59:09 2010
@@ -26,7 +26,7 @@
#include "qpid/broker/Exchange.h"
#include "qpid/framing/Uuid.h"
#include "qpid/sys/Mutex.h"
-#include "qpid/sys/PeriodicTimer.h"
+#include "qpid/sys/Timer.h"
#include "qpid/broker/ConnectionToken.h"
#include "qpid/management/ManagementObject.h"
#include "qpid/management/ManagementEvent.h"
@@ -65,8 +65,12 @@
ManagementAgent ();
virtual ~ManagementAgent ();
+ /** Called before plugins are initialized */
void configure (const std::string& dataDir, uint16_t interval,
qpid::broker::Broker* broker, int threadPoolSize);
+ /** Called after plugins are initialized. */
+ void pluginsInitialized();
+
void setInterval (uint16_t _interval) { interval = _interval; }
void setExchange (qpid::broker::Exchange::shared_ptr mgmtExchange,
qpid::broker::Exchange::shared_ptr directExchange);
@@ -112,6 +116,15 @@
void setBootSequence(uint16_t b) { bootSequence = b; }
private:
+ struct Periodic : public qpid::sys::TimerTask
+ {
+ ManagementAgent& agent;
+
+ Periodic (ManagementAgent& agent, uint32_t seconds);
+ virtual ~Periodic ();
+ void fire ();
+ };
+
// Storage for tracking remote management agents, attached via the client
// management agent API.
//
@@ -203,7 +216,7 @@
std::string dataDir;
uint16_t interval;
qpid::broker::Broker* broker;
- qpid::sys::PeriodicTimer* timer;
+ qpid::sys::Timer* timer;
uint16_t bootSequence;
uint32_t nextObjectId;
uint32_t brokerBank;
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/Timer.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/Timer.cpp?rev=904656&r1=904655&r2=904656&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/Timer.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/Timer.cpp Fri Jan 29 22:59:09 2010
@@ -30,14 +30,16 @@
namespace qpid {
namespace sys {
-TimerTask::TimerTask(Duration timeout) :
+TimerTask::TimerTask(Duration timeout, const std::string& n) :
+ name(n),
sortTime(AbsTime::FarFuture()),
period(timeout),
nextFireTime(AbsTime::now(), timeout),
cancelled(false)
{}
-TimerTask::TimerTask(AbsTime time) :
+TimerTask::TimerTask(AbsTime time, const std::string& n) :
+ name(n),
sortTime(AbsTime::FarFuture()),
period(0),
nextFireTime(time),
@@ -102,13 +104,15 @@
{
ScopedLock<Mutex> l(t->callbackLock);
if (t->cancelled) {
+ drop(t);
if (delay > 500 * TIME_MSEC) {
- QPID_LOG(debug, "cancelled Timer woken up " << delay / TIME_MSEC << "ms late");
+ QPID_LOG(debug, "cancelled Timer woken up " << delay / TIME_MSEC
+ << "ms late");
}
continue;
} else if(Duration(t->nextFireTime, start) >= 0) {
Monitor::ScopedUnlock u(monitor);
- t->fireTask();
+ fire(t);
// Warn on callback overrun
AbsTime end(AbsTime::now());
Duration overrun(tasks.top()->nextFireTime, end);
@@ -169,6 +173,14 @@
runner.join();
}
+// Allow subclasses to override behavior when firing a task.
+void Timer::fire(boost::intrusive_ptr<TimerTask> t) {
+ t->fireTask();
+}
+
+// Provided for subclasses: called when a task is droped.
+void Timer::drop(boost::intrusive_ptr<TimerTask>) {}
+
bool operator<(const intrusive_ptr<TimerTask>& a,
const intrusive_ptr<TimerTask>& b)
{
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/Timer.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/Timer.h?rev=904656&r1=904655&r2=904656&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/Timer.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/Timer.h Fri Jan 29 22:59:09 2010
@@ -38,10 +38,11 @@
class Timer;
class TimerTask : public RefCounted {
- friend class Timer;
- friend bool operator<(const boost::intrusive_ptr<TimerTask>&,
- const boost::intrusive_ptr<TimerTask>&);
+ friend class Timer;
+ friend bool operator<(const boost::intrusive_ptr<TimerTask>&,
+ const boost::intrusive_ptr<TimerTask>&);
+ std::string name;
AbsTime sortTime;
Duration period;
AbsTime nextFireTime;
@@ -51,30 +52,26 @@
bool readyToFire() const;
void fireTask();
-public:
- QPID_COMMON_EXTERN TimerTask(Duration period);
- QPID_COMMON_EXTERN TimerTask(AbsTime fireTime);
+ public:
+ QPID_COMMON_EXTERN TimerTask(Duration period, const std::string& name=std::string());
+ QPID_COMMON_EXTERN TimerTask(AbsTime fireTime, const std::string& name=std::string());
QPID_COMMON_EXTERN virtual ~TimerTask();
QPID_COMMON_EXTERN void setupNextFire();
QPID_COMMON_EXTERN void restart();
QPID_COMMON_EXTERN void cancel();
-protected:
+ std::string getName() const { return name; }
+
+ protected:
// Must be overridden with callback
virtual void fire() = 0;
};
// For the priority_queue order
bool operator<(const boost::intrusive_ptr<TimerTask>& a,
- const boost::intrusive_ptr<TimerTask>& b);
-
-/**
- A timer to trigger tasks that are local to one broker.
+ const boost::intrusive_ptr<TimerTask>& b);
- For periodic tasks that should be synchronized across all brokers
- in a cluster, use qpid::sys::PeriodicTimer.
- */
class Timer : private Runnable {
qpid::sys::Monitor monitor;
std::priority_queue<boost::intrusive_ptr<TimerTask> > tasks;
@@ -84,13 +81,17 @@
// Runnable interface
void run();
-public:
+ public:
QPID_COMMON_EXTERN Timer();
- QPID_COMMON_EXTERN ~Timer();
+ QPID_COMMON_EXTERN virtual ~Timer();
- QPID_COMMON_EXTERN void add(boost::intrusive_ptr<TimerTask> task);
- QPID_COMMON_EXTERN void start();
- QPID_COMMON_EXTERN void stop();
+ QPID_COMMON_EXTERN virtual void add(boost::intrusive_ptr<TimerTask> task);
+ QPID_COMMON_EXTERN virtual void start();
+ QPID_COMMON_EXTERN virtual void stop();
+
+ protected:
+ QPID_COMMON_EXTERN virtual void fire(boost::intrusive_ptr<TimerTask> task);
+ QPID_COMMON_EXTERN virtual void drop(boost::intrusive_ptr<TimerTask> task);
};
Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=904656&r1=904655&r2=904656&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Fri Jan 29 22:59:09 2010
@@ -95,8 +95,12 @@
<field name="frame-seq" type="sequence-no"/>
</control>
- <!-- Synchronize periodic timer tasks across the cluster -->
- <control name="periodic-timer" code="0x15">
+ <!-- Synchronize timer tasks across the cluster -->
+ <control name="timer-wakeup" code="0x15">
+ <field name="name" type="str16"/>
+ </control>
+
+ <control name="timer-drop" code="0x16">
<field name="name" type="str16"/>
</control>
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org