You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/01/27 23:21:29 UTC
svn commit: r903869 - in /qpid/trunk/qpid/cpp: include/qpid/sys/ src/
src/qpid/broker/ src/qpid/cluster/
Author: aconway
Date: Wed Jan 27 22:21:28 2010
New Revision: 903869
URL: http://svn.apache.org/viewvc?rev=903869&view=rev
Log:
QPID_2634 Management updates in timer create inconsistencies in a cluster.
Cluster plugin provides a PeriodicTimer implementation to the broker
which executes tasks in the cluster dispatch thread simultaneously
across the cluster.
Added:
qpid/trunk/qpid/cpp/src/qpid/broker/DelegatingPeriodicTimer.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/DelegatingPeriodicTimer.h
Modified:
qpid/trunk/qpid/cpp/include/qpid/sys/Time.h
qpid/trunk/qpid/cpp/src/CMakeLists.txt
qpid/trunk/qpid/cpp/src/Makefile.am
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
qpid/trunk/qpid/cpp/src/qpid/cluster/PeriodicTimerImpl.cpp
Modified: qpid/trunk/qpid/cpp/include/qpid/sys/Time.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/sys/Time.h?rev=903869&r1=903868&r2=903869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/sys/Time.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/sys/Time.h Wed Jan 27 22:21:28 2010
@@ -122,7 +122,7 @@
friend class AbsTime;
public:
- QPID_COMMON_EXTERN inline Duration(int64_t time0);
+ QPID_COMMON_EXTERN inline Duration(int64_t time0 = 0);
QPID_COMMON_EXTERN explicit Duration(const AbsTime& time0);
QPID_COMMON_EXTERN explicit Duration(const AbsTime& start, const AbsTime& finish);
inline operator int64_t() const;
Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=903869&r1=903868&r2=903869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Wed Jan 27 22:21:28 2010
@@ -733,6 +733,7 @@
qpid/broker/Connection.cpp
qpid/broker/ConnectionHandler.cpp
qpid/broker/ConnectionFactory.cpp
+ qpid/broker/DelegatingPeriodicTimer.cpp
qpid/broker/DeliverableMessage.cpp
qpid/broker/DeliveryRecord.cpp
qpid/broker/DirectExchange.cpp
Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=903869&r1=903868&r2=903869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Wed Jan 27 22:21:28 2010
@@ -508,6 +508,8 @@
qpid/broker/Consumer.h \
qpid/broker/Daemon.cpp \
qpid/broker/Daemon.h \
+ qpid/broker/DelegatingPeriodicTimer.h \
+ qpid/broker/DelegatingPeriodicTimer.cpp \
qpid/broker/Deliverable.h \
qpid/broker/DeliverableMessage.cpp \
qpid/broker/DeliverableMessage.h \
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=903869&r1=903868&r2=903869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Wed Jan 27 22:21:28 2010
@@ -137,7 +137,6 @@
Broker::Broker(const Broker::Options& conf) :
poller(new Poller),
- periodicTimer(new sys::PeriodicTimerImpl(timer)),
config(conf),
managementAgent(conf.enableMgmt ? new ManagementAgent() : 0),
store(new NullMessageStore),
@@ -258,6 +257,12 @@
// Initialize plugins
Plugin::initializeAll(*this);
+ if (!periodicTimer.hasDelegate()) {
+ // If no plugin has contributed a PeriodicTimer, use the default one.
+ periodicTimer.setDelegate(
+ std::auto_ptr<sys::PeriodicTimer>(new sys::PeriodicTimerImpl(timer)));
+ }
+
if (conf.queueCleanInterval) {
queueCleaner.start(conf.queueCleanInterval * qpid::sys::TIME_SEC);
}
@@ -469,6 +474,10 @@
return knownBrokers;
}
+void Broker::setPeriodicTimer(std::auto_ptr<sys::PeriodicTimer> pt) {
+ periodicTimer.setDelegate(pt);
+}
+
const std::string Broker::TCP_TRANSPORT("tcp");
}} // namespace qpid::broker
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=903869&r1=903868&r2=903869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Wed Jan 27 22:21:28 2010
@@ -25,6 +25,7 @@
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/ConnectionFactory.h"
#include "qpid/broker/ConnectionToken.h"
+#include "qpid/broker/DelegatingPeriodicTimer.h"
#include "qpid/broker/DirectExchange.h"
#include "qpid/broker/DtxManager.h"
#include "qpid/broker/ExchangeRegistry.h"
@@ -49,7 +50,6 @@
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Timer.h"
-#include "qpid/sys/PeriodicTimer.h"
#include "qpid/RefCounted.h"
#include "qpid/broker/AclModule.h"
#include "qpid/sys/Mutex.h"
@@ -147,7 +147,7 @@
boost::shared_ptr<sys::Poller> poller;
sys::Timer timer;
- std::auto_ptr<sys::PeriodicTimer> periodicTimer;
+ DelegatingPeriodicTimer periodicTimer;
Options config;
std::auto_ptr<management::ManagementAgent> managementAgent;
ProtocolFactoryMap protocolFactories;
@@ -255,8 +255,8 @@
void setConnectionFactory(boost::shared_ptr<sys::ConnectionCodec::Factory> f) { factory = f; }
sys::Timer& getTimer() { return timer; }
- sys::PeriodicTimer& getPeriodicTimer() { return *periodicTimer; }
- void setPeriodicTimer(std::auto_ptr<sys::PeriodicTimer> pt) { periodicTimer = pt; }
+ sys::PeriodicTimer& getPeriodicTimer() { return periodicTimer; }
+ void setPeriodicTimer(std::auto_ptr<sys::PeriodicTimer> pt);
boost::function<std::vector<Url> ()> getKnownBrokers;
Added: qpid/trunk/qpid/cpp/src/qpid/broker/DelegatingPeriodicTimer.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DelegatingPeriodicTimer.cpp?rev=903869&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DelegatingPeriodicTimer.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DelegatingPeriodicTimer.cpp Wed Jan 27 22:21:28 2010
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "DelegatingPeriodicTimer.h"
+
+namespace qpid {
+namespace broker {
+
+DelegatingPeriodicTimer::DelegatingPeriodicTimer() {}
+
+void DelegatingPeriodicTimer::add(
+ const Task& task, sys::Duration period, const std::string& taskName)
+{
+ if (delegate.get())
+ delegate->add(task, period, taskName);
+ else {
+ Entry e;
+ e.task = task;
+ e.period = period;
+ e.name = taskName;
+ entries.push_back(e);
+ }
+}
+
+void DelegatingPeriodicTimer::setDelegate(std::auto_ptr<PeriodicTimer> impl) {
+ assert(impl.get());
+ assert(!delegate.get());
+ delegate = impl;
+ for (Entries::iterator i = entries.begin(); i != entries.end(); ++i)
+ delegate->add(i->task, i->period, i->name);
+}
+
+}} // namespace qpid::broker
Added: qpid/trunk/qpid/cpp/src/qpid/broker/DelegatingPeriodicTimer.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DelegatingPeriodicTimer.h?rev=903869&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DelegatingPeriodicTimer.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DelegatingPeriodicTimer.h Wed Jan 27 22:21:28 2010
@@ -0,0 +1,57 @@
+#ifndef QPID_BROKER_PERIODICTIMER_H
+#define QPID_BROKER_PERIODICTIMER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/PeriodicTimer.h"
+#include <vector>
+#include <memory>
+
+namespace qpid {
+namespace broker {
+
+/**
+ * A PeriodicTimer implementation that delegates to another PeriodicTimer.
+ *
+ * Tasks added while there is no delegate timer are stored.
+ * When a delgate timer is set, stored tasks are added to it.
+ */
+class DelegatingPeriodicTimer : public sys::PeriodicTimer
+{
+ public:
+ DelegatingPeriodicTimer();
+ /** Add a task: if no delegate, store it. When delegate set, add stored tasks */
+ void add(const Task& task, sys::Duration period, const std::string& taskName);
+ /** Set the delegate, transfers ownership of delegate. */
+ void setDelegate(std::auto_ptr<PeriodicTimer> delegate);
+ bool hasDelegate() { return delegate.get(); }
+ private:
+ struct Entry { Task task; sys::Duration period; std::string name; };
+ typedef std::vector<Entry> Entries;
+ std::auto_ptr<PeriodicTimer> delegate;
+ Entries entries;
+
+};
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_PERIODICTIMER_H*/
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=903869&r1=903868&r2=903869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed Jan 27 22:21:28 2010
@@ -250,9 +250,10 @@
updateRetracted(false),
error(*this)
{
- // FIXME aconway 2010-01-26: must be done before management registers with timer.
- broker.setPeriodicTimer(
- std::auto_ptr<sys::PeriodicTimer>(new PeriodicTimerImpl(*this)));
+ // We give ownership of the timer to the broker and keep a plain pointer.
+ // This is OK as it means the timer has the same lifetime as the broker.
+ timer = new PeriodicTimerImpl(*this);
+ broker.setPeriodicTimer(std::auto_ptr<sys::PeriodicTimer>(timer));
mAgent = broker.getManagementAgent();
if (mAgent != 0){
@@ -448,8 +449,8 @@
// Handler for deliverFrameQueue.
// This thread executes the main logic.
void Cluster::deliveredFrame(const EventFrame& efConst) {
- sys::ClusterSafeScope css; // Don't trigger cluster-safe asserts.
Mutex::ScopedLock l(lock);
+ sys::ClusterSafeScope css; // Don't trigger cluster-safe asserts.
if (state == LEFT) return;
EventFrame e(efConst);
const ClusterUpdateOfferBody* offer = castUpdateOffer(e.frame.getBody());
@@ -961,13 +962,13 @@
error.respondNone(from, type, frameSeq);
}
-void Cluster::periodicTimer(const MemberId&, const std::string& , Lock&) {
- // FIXME aconway 2010-01-26:
+void Cluster::periodicTimer(const MemberId&, const std::string& name, Lock&) {
+ timer->deliver(name);
}
bool Cluster::isElder() const {
- Mutex::ScopedLock l(lock);
- return elders.empty();
+ Monitor::ScopedLock l(lock);
+ return state >= CATCHUP && elders.empty();
}
}} // namespace qpid::cluster
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=903869&r1=903868&r2=903869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Wed Jan 27 22:21:28 2010
@@ -63,6 +63,7 @@
class Connection;
class EventFrame;
+class PeriodicTimerImpl;
/**
* Connection to the cluster
@@ -271,6 +272,7 @@
bool updateRetracted;
ErrorCheck error;
UpdateReceiver updateReceiver;
+ PeriodicTimerImpl* timer;
friend std::ostream& operator<<(std::ostream&, const Cluster&);
friend class ClusterDispatcher;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/PeriodicTimerImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/PeriodicTimerImpl.cpp?rev=903869&r1=903868&r2=903869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/PeriodicTimerImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/PeriodicTimerImpl.cpp Wed Jan 27 22:21:28 2010
@@ -32,20 +32,24 @@
Cluster& c, const Task& t, sys::Duration d, const std::string& n)
: TimerTask(d), cluster(c), timer(c.getBroker().getTimer()),
task(t), name(n), inFlight(false)
-{}
+{
+ timer.add(this);
+}
void PeriodicTimerImpl::TaskEntry::fire() {
+ setupNextFire();
+ timer.add(this);
+ bool isElder = cluster.isElder(); // Call outside lock to avoid deadlock.
sys::Mutex::ScopedLock l(lock);
// Only the elder mcasts.
// Don't mcast another if we haven't yet received the last one.
- if (cluster.isElder() && !inFlight) {
+ if (isElder && !inFlight) {
+ QPID_LOG(trace, "Sending periodic-timer control for " << name);
inFlight = true;
cluster.getMulticast().mcastControl(
framing::ClusterPeriodicTimerBody(framing::ProtocolVersion(), name),
cluster.getId());
}
- setupNextFire();
- timer.add(this);
}
void PeriodicTimerImpl::TaskEntry::deliver() {
@@ -59,6 +63,7 @@
const Task& task, sys::Duration period, const std::string& name)
{
sys::Mutex::ScopedLock l(lock);
+ QPID_LOG(debug, "Periodic timer add entry for " << name);
if (map.find(name) != map.end())
throw Exception(QPID_MSG("Cluster timer task name added twice: " << name));
map[name] = new TaskEntry(cluster, task, period, name);
@@ -72,6 +77,7 @@
if (i == map.end())
throw Exception(QPID_MSG("Cluster timer unknown task: " << name));
}
+ QPID_LOG(debug, "Periodic timer execute " << name);
i->second->deliver();
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org