You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/03/05 21:24:42 UTC
svn commit: r750574 - in /qpid/trunk/qpid/cpp/src/qpid/cluster: Cluster.cpp
Cluster.h ExpiryPolicy.cpp ExpiryPolicy.h
Author: aconway
Date: Thu Mar 5 20:24:41 2009
New Revision: 750574
URL: http://svn.apache.org/viewvc?rev=750574&view=rev
Log:
Cluster: restore separate event/frame threads.
Modified:
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=750574&r1=750573&r2=750574&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Thu Mar 5 20:24:41 2009
@@ -96,6 +96,7 @@
self(cpg.self()),
readMax(settings.readMax),
writeEstimate(settings.writeEstimate),
+ expiryPolicy(new ExpiryPolicy(mcast, self, broker.getTimer())),
mcast(cpg, poller, boost::bind(&Cluster::leave, this)),
dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)),
deliverEventQueue(boost::bind(&Cluster::deliveredEvent, this, _1),
@@ -106,12 +107,11 @@
boost::bind(&Cluster::leave, this),
"Error delivering frames",
poller),
- expiryPolicy(new ExpiryPolicy(boost::bind(&Cluster::isLeader, this), mcast, self, broker.getTimer())),
- eventId(0),
+ connections(*this),
frameId(0),
initialized(false),
state(INIT),
- connections(*this),
+ eventId(0),
lastSize(0),
lastBroker(false)
{
@@ -156,19 +156,15 @@
// Called in connection thread to insert a client connection.
void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) {
- Lock l(lock);
connections.insert(c);
}
// Called in connection thread to insert an updated shadow connection.
void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) {
- Lock l(lock);
- assert(state <= UPDATEE); // Only during update.
connections.insert(c);
}
void Cluster::erase(const ConnectionId& id) {
- // Called only by Connection::deliverClose in deliver thread with lock held.
connections.erase(id);
}
@@ -225,12 +221,11 @@
}
void Cluster::deliver(const Event& e) {
- if (state == LEFT) return;
- QPID_LATENCY_INIT(e);
deliverEventQueue.push(e);
}
-// Handler for deliverEventQueue
+// Handler for deliverEventQueue.
+// This thread executes cluster controls and decodes connection data events.
void Cluster::deliveredEvent(const Event& event) {
Event e(event);
Mutex::ScopedLock l(lock);
@@ -246,26 +241,34 @@
throw Exception(QPID_MSG("Invalid cluster control"));
}
else if (state >= CATCHUP) { // Handle connection frames
- if (e.getType() == CONTROL) {
+ if (e.getType() == CONTROL)
connectionFrame(EventFrame(e, e.getFrame()));
- }
else
connections.decode(e, e.getData());
}
// Drop connection frames while state < CATCHUP
}
-// Handler for deliverFrameQueue
+void Cluster::connectionFrame(const EventFrame& frame) {
+ deliverFrameQueue.push(frame);
+}
+
+// Handler for deliverFrameQueue.
+// This thread executes connection control and data frames.
void Cluster::deliveredFrame(const EventFrame& event) {
- Mutex::ScopedLock l(lock); // TODO aconway 2009-03-02: don't need this lock?
+ // No lock, only use connections, not Cluster state.
EventFrame e(event);
- assert(!e.isCluster()); // Only connection frames on this queue.
- QPID_LOG(trace, *this << " DLVR: " << e);
- if (e.type == DATA) // Add cluster-id to to data frames.
- e.frame.setClusterId(frameId++);
- boost::intrusive_ptr<Connection> connection = connections.get(e.connectionId);
- if (connection) // Ignore frames to closed local connections.
- connection->deliveredFrame(e);
+ if(!e.frame.getBody()) { // marks the stall point, start the update task.
+ updateThread=Thread(*updateTask);
+ }
+ else {
+ QPID_LOG(trace, *this << " DLVR: " << e);
+ if (e.type == DATA) // Add cluster-id to to data frames.
+ e.frame.setClusterId(frameId++);
+ boost::intrusive_ptr<Connection> connection = connections.get(e.connectionId);
+ if (connection)
+ connection->deliveredFrame(e);
+ }
}
struct AddrList {
@@ -333,7 +336,7 @@
if (state == INIT) { // First configChange
if (map.aliveCount() == 1) {
- setClusterId(true);
+ setClusterId(true, l);
setReady(l);
map = ClusterMap(self, myUrl, true);
memberUpdate(l);
@@ -358,8 +361,6 @@
}
}
-bool Cluster::isLeader() const { return elders.empty(); }
-
void Cluster::makeOffer(const MemberId& id, Lock& ) {
if (state == READY && map.isJoiner(id)) {
state = OFFER;
@@ -374,11 +375,9 @@
// callbacks will be invoked.
//
void Cluster::brokerShutdown() {
- if (state != LEFT) {
- try { cpg.shutdown(); }
- catch (const std::exception& e) {
- QPID_LOG(error, *this << " shutting down CPG: " << e.what());
- }
+ try { cpg.shutdown(); }
+ catch (const std::exception& e) {
+ QPID_LOG(error, *this << " shutting down CPG: " << e.what());
}
delete this;
}
@@ -401,10 +400,6 @@
// Stop processing the deliveredEventQueue in order to send or
// recieve an update.
deliverEventQueue.stop();
-
- // FIXME aconway 2009-03-04: if frame queue is re-enabled, we must
- // also wait for it to be empty before we are stalled, so that
- // our local model is up-to-date to give an update.
}
void Cluster::unstall(Lock&) {
@@ -430,7 +425,7 @@
}
else if (updatee == self && url) {
assert(state == JOINER);
- setClusterId(uuid);
+ setClusterId(uuid, l);
state = UPDATEE;
QPID_LOG(info, *this << " receiving update from " << updater);
stall(l);
@@ -444,16 +439,20 @@
state = UPDATER;
QPID_LOG(info, *this << " stall for update to " << updatee << " at " << url);
stall(l);
- if (updateThread.id()) updateThread.join(); // Join the previous updatethread.
+
+ if (updateThread.id())
+ updateThread.join(); // Join the previous updateThread to avoid leaks.
client::ConnectionSettings cs;
cs.username = settings.username;
cs.password = settings.password;
cs.mechanism = settings.mechanism;
- updateThread = Thread(
- new UpdateClient(self, updatee, url, broker, map, eventId, frameId, connections.values(),
+ updateTask = new UpdateClient(self, updatee, url, broker, map, eventId, frameId, connections.values(),
boost::bind(&Cluster::updateOutDone, this),
boost::bind(&Cluster::updateOutError, this, _1),
- cs));
+ cs);
+ // Push an empty frame onto the deliverFrameQueue to mark the stall point.
+ // The deliverFrameQueue thread will start the update at that point.
+ deliverFrameQueue.push(EventFrame(EventHeader(), AMQFrame()));
}
// Called in update thread.
@@ -461,6 +460,7 @@
Lock l(lock);
updatedMap = m;
eventId = eventId_;
+ // Safe to use frameId here because we are stalled: deliveredFrame cannot be called concurrently.
frameId = frameId_;
checkUpdateIn(l);
}
@@ -602,7 +602,7 @@
}
}
-void Cluster::setClusterId(const Uuid& uuid) {
+void Cluster::setClusterId(const Uuid& uuid, Lock&) {
clusterId = uuid;
if (mgmtObject) {
stringstream stream;
@@ -617,13 +617,4 @@
expiryPolicy->deliverExpire(id);
}
-void Cluster::connectionFrame(const EventFrame& frame) {
- // FIXME aconway 2009-03-02: bypass deliverFrameQueue to avoid race condition.
- // Measure performance impact & review.
- //
- // deliverFrameQueue.push(frame);
- //
- deliveredFrame(frame);
-}
-
}} // namespace qpid::cluster
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=750574&r1=750573&r2=750574&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Thu Mar 5 20:24:41 2009
@@ -66,25 +66,27 @@
typedef boost::intrusive_ptr<Connection> ConnectionPtr;
typedef std::vector<ConnectionPtr> Connections;
- /** Construct the cluster in plugin earlyInitialize */
+ // Public functions are thread safe unless otherwise mentioned in a comment.
+
+ // Construct the cluster in plugin earlyInitialize.
Cluster(const ClusterSettings&, broker::Broker&);
virtual ~Cluster();
- /** Join the cluster in plugin initialize. Requires transport
- * plugins to be available.. */
+ // Called by plugin initialize: cluster start-up requires transport plugins .
+ // Thread safety: only called by plugin initialize.
void initialize();
- // Connection map - called in connection threads.
+ // Connection map.
void addLocalConnection(const ConnectionPtr&);
void addShadowConnection(const ConnectionPtr&);
void erase(const ConnectionId&);
- // URLs of current cluster members - called in connection threads.
+ // URLs of current cluster members.
std::vector<std::string> getIds() const;
std::vector<Url> getUrls() const;
boost::shared_ptr<FailoverExchange> getFailoverExchange() const { return failoverExchange; }
- // Leave the cluster - called in any thread.
+ // Leave the cluster - called when fatal errors occur.
void leave();
// Update completed - called in update thread
@@ -94,15 +96,13 @@
broker::Broker& getBroker() const;
Multicaster& getMulticast() { return mcast; }
- boost::function<bool ()> isQuorate;
- void checkQuorum(); // called in connection threads.
+ void checkQuorum();
size_t getReadMax() { return readMax; }
size_t getWriteEstimate() { return writeEstimate; }
- bool isLeader() const; // Called in deliver thread.
-
- // Called by Connection in deliver event thread with decoded connection data frames.
+ // Process a connection frame. Called by Connection with decoded frames.
+ // Thread safety: only called in deliverEventQueue thread.
void connectionFrame(const EventFrame&);
private:
@@ -111,11 +111,9 @@
typedef PollableQueue<Event> PollableEventQueue;
typedef PollableQueue<EventFrame> PollableFrameQueue;
- // NB: The final Lock& parameter on functions below is used to mark functions
- // that should only be called by a function that already holds the lock.
- // The parameter makes it hard to forget since you have to have an instance of
- // a Lock to call the unlocked functions.
-
+ // NB: A dummy Lock& parameter marks functions that must only be
+ // called with Cluster::lock locked.
+
void leave(Lock&);
std::vector<std::string> getIds(Lock&) const;
std::vector<Url> getUrls(Lock&) const;
@@ -123,18 +121,19 @@
// Make an offer if we can - called in deliver thread.
void makeOffer(const MemberId&, Lock&);
- // Called in main thread in ~Broker.
+ // Called in main thread from Broker destructor.
void brokerShutdown();
// Cluster controls implement XML methods from cluster.xml.
- // Called in deliveredEvent thread.
- //
+ // Called in deliverEventQueue thread.
void updateRequest(const MemberId&, const std::string&, Lock&);
void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, Lock&);
void ready(const MemberId&, const std::string&, Lock&);
void configChange(const MemberId&, const std::string& addresses, Lock& l);
void messageExpired(const MemberId&, uint64_t, Lock& l);
void shutdown(const MemberId&, Lock&);
+ // Helper, called by updateOffer.
+ void updateStart(const MemberId& updatee, const Url& url, Lock&);
// Used by cluster controls.
void stall(Lock&);
@@ -144,13 +143,6 @@
void deliveredEvent(const Event&);
void deliveredFrame(const EventFrame&);
- // Helper, called in deliver thread.
- void updateStart(const MemberId& updatee, const Url& url, Lock&);
-
- // Called in event deliver thread to check for update status.
- bool isUpdateComplete(const EventFrame&);
- bool isUpdateComplete();
-
void setReady(Lock&);
void deliver( // CPG deliver callback.
@@ -186,7 +178,7 @@
void updateOutError(const std::exception&);
void updateOutDone(Lock&);
- void setClusterId(const framing::Uuid&);
+ void setClusterId(const framing::Uuid&, Lock&);
// Immutable members set on construction, never changed.
ClusterSettings settings;
@@ -202,6 +194,7 @@
framing::Uuid clusterId;
NoOpConnectionOutputHandler shadowOut;
qpid::management::ManagementAgent* mAgent;
+ boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
// Thread safe members
Multicaster mcast;
@@ -210,11 +203,9 @@
PollableFrameQueue deliverFrameQueue;
boost::shared_ptr<FailoverExchange> failoverExchange;
Quorum quorum;
-
- // Used only in deliveredFrame thread
- ClusterMap::Set elders;
- boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
- uint64_t eventId; // FIXME aconway 2009-03-04: review use for thread safety frame-q thread re-enabled.
+ ConnectionMap connections;
+
+ // Used only in deliverFrameQueue thread or in deliverEventQueue thread when stalled.
uint64_t frameId;
// Used only during initialization
@@ -235,15 +226,16 @@
LEFT ///< Final state, left the cluster.
} state;
- ConnectionMap connections;
+ uint64_t eventId;
ClusterMap map;
+ ClusterMap::Set elders;
size_t lastSize;
bool lastBroker;
-
- // Update related
sys::Thread updateThread;
+ sys::Runnable* updateTask;
boost::optional<ClusterMap> updatedMap;
+
friend std::ostream& operator<<(std::ostream&, const Cluster&);
friend class ClusterDispatcher;
};
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp?rev=750574&r1=750573&r2=750574&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp Thu Mar 5 20:24:41 2009
@@ -30,8 +30,8 @@
namespace qpid {
namespace cluster {
-ExpiryPolicy::ExpiryPolicy(const boost::function<bool()> & f, Multicaster& m, const MemberId& id, broker::Timer& t)
- : expiredPolicy(new Expired), isLeader(f), mcast(m), memberId(id), timer(t) {}
+ExpiryPolicy::ExpiryPolicy(Multicaster& m, const MemberId& id, broker::Timer& t)
+ : expiredPolicy(new Expired), mcast(m), memberId(id), timer(t) {}
namespace {
uint64_t clusterId(const broker::Message& m) {
@@ -65,8 +65,7 @@
void ExpiryPolicy::sendExpire(uint64_t id) {
sys::Mutex::ScopedLock l(lock);
- if (isLeader())
- mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(), id), memberId);
+ mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(), id), memberId);
}
void ExpiryPolicy::deliverExpire(uint64_t id) {
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h?rev=750574&r1=750573&r2=750574&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h Thu Mar 5 20:24:41 2009
@@ -42,7 +42,7 @@
class ExpiryPolicy : public broker::ExpiryPolicy
{
public:
- ExpiryPolicy(const boost::function<bool()> & isLeader, Multicaster&, const MemberId&, broker::Timer&);
+ ExpiryPolicy(Multicaster&, const MemberId&, broker::Timer&);
void willExpire(broker::Message&);
@@ -65,7 +65,6 @@
IdSet expired;
boost::intrusive_ptr<Expired> expiredPolicy;
- boost::function<bool()> isLeader;
Multicaster& mcast;
MemberId memberId;
broker::Timer& timer;
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org