You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/03/05 21:24:42 UTC

svn commit: r750574 - in /qpid/trunk/qpid/cpp/src/qpid/cluster: Cluster.cpp Cluster.h ExpiryPolicy.cpp ExpiryPolicy.h

Author: aconway
Date: Thu Mar  5 20:24:41 2009
New Revision: 750574

URL: http://svn.apache.org/viewvc?rev=750574&view=rev
Log:
Cluster: restore separate event/frame threads.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=750574&r1=750573&r2=750574&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Thu Mar  5 20:24:41 2009
@@ -96,6 +96,7 @@
     self(cpg.self()),
     readMax(settings.readMax),
     writeEstimate(settings.writeEstimate),
+    expiryPolicy(new ExpiryPolicy(mcast, self, broker.getTimer())),
     mcast(cpg, poller, boost::bind(&Cluster::leave, this)),
     dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)),
     deliverEventQueue(boost::bind(&Cluster::deliveredEvent, this, _1),
@@ -106,12 +107,11 @@
                       boost::bind(&Cluster::leave, this),
                       "Error delivering frames",
                       poller),
-    expiryPolicy(new ExpiryPolicy(boost::bind(&Cluster::isLeader, this), mcast, self, broker.getTimer())),
-    eventId(0),
+    connections(*this),
     frameId(0),
     initialized(false),
     state(INIT),
-    connections(*this),
+    eventId(0),
     lastSize(0),
     lastBroker(false)
 {
@@ -156,19 +156,15 @@
 
 // Called in connection thread to insert a client connection.
 void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) {
-    Lock l(lock);
     connections.insert(c);
 }
 
 // Called in connection thread to insert an updated shadow connection.
 void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) {
-    Lock l(lock);
-    assert(state <= UPDATEE);   // Only during update.
     connections.insert(c);
 }
 
 void Cluster::erase(const ConnectionId& id) {
-    // Called only by Connection::deliverClose in deliver thread with lock held.
     connections.erase(id);
 }
 
@@ -225,12 +221,11 @@
 }
 
 void Cluster::deliver(const Event& e) {
-    if (state == LEFT) return;
-    QPID_LATENCY_INIT(e);
     deliverEventQueue.push(e);
 }
 
-// Handler for deliverEventQueue
+// Handler for deliverEventQueue.
+// This thread executes cluster controls and decodes connection data events.
 void Cluster::deliveredEvent(const Event& event) {
     Event e(event);
     Mutex::ScopedLock l(lock);
@@ -246,26 +241,34 @@
             throw Exception(QPID_MSG("Invalid cluster control"));
     }
     else if (state >= CATCHUP) { // Handle connection frames  
-        if (e.getType() == CONTROL) {
+        if (e.getType() == CONTROL) 
             connectionFrame(EventFrame(e, e.getFrame()));
-        }
         else
             connections.decode(e, e.getData());
     }
     // Drop connection frames while state < CATCHUP
 }
 
-// Handler for deliverFrameQueue
+void Cluster::connectionFrame(const EventFrame& frame) {
+    deliverFrameQueue.push(frame);
+}
+
+// Handler for deliverFrameQueue.
+// This thread executes connection control and data frames.
 void Cluster::deliveredFrame(const EventFrame& event) {
-    Mutex::ScopedLock l(lock);  // TODO aconway 2009-03-02: don't need this lock?
+    // No lock, only use connections, not Cluster state.
     EventFrame e(event);
-    assert(!e.isCluster());     // Only connection frames on this queue.
-    QPID_LOG(trace, *this << " DLVR:  " << e);
-    if (e.type == DATA)         // Add cluster-id to to data frames.
-        e.frame.setClusterId(frameId++);
-    boost::intrusive_ptr<Connection> connection = connections.get(e.connectionId);
-    if (connection)      // Ignore frames to closed local connections.
-        connection->deliveredFrame(e);
+    if(!e.frame.getBody()) {    // marks the stall point, start the update task.
+        updateThread=Thread(*updateTask);
+    }
+    else {
+        QPID_LOG(trace, *this << " DLVR:  " << e);
+        if (e.type == DATA)         // Add cluster-id to to data frames.
+            e.frame.setClusterId(frameId++); 
+        boost::intrusive_ptr<Connection> connection = connections.get(e.connectionId);
+        if (connection)
+            connection->deliveredFrame(e);
+    }
 }
   
 struct AddrList {
@@ -333,7 +336,7 @@
 
     if (state == INIT) {        // First configChange
         if (map.aliveCount() == 1) {
-            setClusterId(true);
+            setClusterId(true, l);
             setReady(l);
             map = ClusterMap(self, myUrl, true);
             memberUpdate(l);
@@ -358,8 +361,6 @@
     }
 }
 
-bool Cluster::isLeader() const { return elders.empty(); }
-
 void Cluster::makeOffer(const MemberId& id, Lock& ) {
     if (state == READY && map.isJoiner(id)) {
         state = OFFER;
@@ -374,11 +375,9 @@
 // callbacks will be invoked.
 // 
 void Cluster::brokerShutdown()  {
-    if (state != LEFT) {
-        try { cpg.shutdown(); }
-        catch (const std::exception& e) {
-            QPID_LOG(error, *this << " shutting down CPG: " << e.what());
-        }
+    try { cpg.shutdown(); }
+    catch (const std::exception& e) {
+        QPID_LOG(error, *this << " shutting down CPG: " << e.what());
     }
     delete this;
 }
@@ -401,10 +400,6 @@
     // Stop processing the deliveredEventQueue in order to send or
     // recieve an update.
     deliverEventQueue.stop();
-
-    // FIXME aconway 2009-03-04: if frame queue is re-enabled, we must
-    // also wait for it to be empty before we are stalled, so that
-    // our local model is up-to-date to give an update.
 }
 
 void Cluster::unstall(Lock&) {
@@ -430,7 +425,7 @@
     }
     else if (updatee == self && url) {
         assert(state == JOINER);
-        setClusterId(uuid);
+        setClusterId(uuid, l);
         state = UPDATEE;
         QPID_LOG(info, *this << " receiving update from " << updater);
         stall(l);
@@ -444,16 +439,20 @@
     state = UPDATER;
     QPID_LOG(info, *this << " stall for update to " << updatee << " at " << url);
     stall(l);
-    if (updateThread.id()) updateThread.join(); // Join the previous updatethread.
+
+    if (updateThread.id())
+        updateThread.join(); // Join the previous updateThread to avoid leaks.
     client::ConnectionSettings cs;
     cs.username = settings.username;
     cs.password = settings.password;
     cs.mechanism = settings.mechanism;
-    updateThread = Thread(
-        new UpdateClient(self, updatee, url, broker, map, eventId, frameId, connections.values(),
+    updateTask = new UpdateClient(self, updatee, url, broker, map, eventId, frameId, connections.values(),
                          boost::bind(&Cluster::updateOutDone, this),
                          boost::bind(&Cluster::updateOutError, this, _1),
-                         cs));
+                         cs);
+    // Push an empty frame onto the deliverFrameQueue to mark the stall point.
+    // The deliverFrameQueue thread will start the update at that point.
+    deliverFrameQueue.push(EventFrame(EventHeader(), AMQFrame()));
 }
 
 // Called in update thread.
@@ -461,6 +460,7 @@
     Lock l(lock);
     updatedMap = m;
     eventId = eventId_;
+    // Safe to use frameId here because we are stalled: deliveredFrame cannot be called concurrently.
     frameId = frameId_;
     checkUpdateIn(l);
 }
@@ -602,7 +602,7 @@
     }
 }
 
-void Cluster::setClusterId(const Uuid& uuid) {
+void Cluster::setClusterId(const Uuid& uuid, Lock&) {
     clusterId = uuid;
     if (mgmtObject) {
         stringstream stream;
@@ -617,13 +617,4 @@
     expiryPolicy->deliverExpire(id);
 }
 
-void Cluster::connectionFrame(const EventFrame& frame) {
-    // FIXME aconway 2009-03-02: bypass deliverFrameQueue to avoid race condition.
-    // Measure performance impact & review.
-    // 
-    // deliverFrameQueue.push(frame);
-    // 
-    deliveredFrame(frame);
-}
-
 }} // namespace qpid::cluster

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=750574&r1=750573&r2=750574&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Thu Mar  5 20:24:41 2009
@@ -66,25 +66,27 @@
     typedef boost::intrusive_ptr<Connection> ConnectionPtr;
     typedef std::vector<ConnectionPtr> Connections;
 
-    /** Construct the cluster in plugin earlyInitialize */ 
+    // Public functions are thread safe unless otherwise mentioned in a comment.
+
+    // Construct the cluster in plugin earlyInitialize.
     Cluster(const ClusterSettings&, broker::Broker&);
     virtual ~Cluster();
 
-    /** Join the cluster in plugin initialize. Requires transport
-     * plugins to be available.. */
+    // Called by plugin initialize: cluster start-up requires transport plugins .
+    // Thread safety: only called by plugin initialize.
     void initialize();
 
-    // Connection map - called in connection threads.
+    // Connection map.
     void addLocalConnection(const ConnectionPtr&); 
     void addShadowConnection(const ConnectionPtr&); 
     void erase(const ConnectionId&);       
     
-    // URLs of current cluster members - called in connection threads.
+    // URLs of current cluster members.
     std::vector<std::string> getIds() const;
     std::vector<Url> getUrls() const;
     boost::shared_ptr<FailoverExchange> getFailoverExchange() const { return failoverExchange; }
 
-    // Leave the cluster - called in any thread.
+    // Leave the cluster - called when fatal errors occur.
     void leave();
 
     // Update completed - called in update thread
@@ -94,15 +96,13 @@
     broker::Broker& getBroker() const;
     Multicaster& getMulticast() { return mcast; }
 
-    boost::function<bool ()> isQuorate;
-    void checkQuorum();         // called in connection threads.
+    void checkQuorum();
 
     size_t getReadMax() { return readMax; }
     size_t getWriteEstimate() { return writeEstimate; }
 
-    bool isLeader() const;       // Called in deliver thread.
-
-    // Called by Connection in deliver event thread with decoded connection data frames.
+    // Process a connection frame. Called by Connection with decoded frames.
+    // Thread safety: only called in deliverEventQueue thread.
     void connectionFrame(const EventFrame&); 
 
   private:
@@ -111,11 +111,9 @@
     typedef PollableQueue<Event> PollableEventQueue;
     typedef PollableQueue<EventFrame> PollableFrameQueue;
 
-    // NB: The final Lock& parameter on functions below is used to mark functions
-    // that should only be called by a function that already holds the lock.
-    // The parameter makes it hard to forget since you have to have an instance of
-    // a Lock to call the unlocked functions.
-
+    // NB: A dummy Lock& parameter marks functions that must only be
+    // called with Cluster::lock  locked.
+ 
     void leave(Lock&);
     std::vector<std::string> getIds(Lock&) const;
     std::vector<Url> getUrls(Lock&) const;
@@ -123,18 +121,19 @@
     // Make an offer if we can - called in deliver thread.
     void makeOffer(const MemberId&, Lock&);
 
-    // Called in main thread in ~Broker.
+    // Called in main thread from Broker destructor.
     void brokerShutdown();
 
     // Cluster controls implement XML methods from cluster.xml.
-    // Called in deliveredEvent thread.
-    // 
+    // Called in deliverEventQueue thread.
     void updateRequest(const MemberId&, const std::string&, Lock&);
     void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, Lock&);
     void ready(const MemberId&, const std::string&, Lock&);
     void configChange(const MemberId&, const std::string& addresses, Lock& l);
     void messageExpired(const MemberId&, uint64_t, Lock& l);
     void shutdown(const MemberId&, Lock&);
+    // Helper, called by updateOffer.
+    void updateStart(const MemberId& updatee, const Url& url, Lock&);
 
     // Used by cluster controls.
     void stall(Lock&);
@@ -144,13 +143,6 @@
     void deliveredEvent(const Event&); 
     void deliveredFrame(const EventFrame&); 
 
-    // Helper, called in deliver thread.
-    void updateStart(const MemberId& updatee, const Url& url, Lock&);
-
-    // Called in event deliver thread to check for update status.
-    bool isUpdateComplete(const EventFrame&);
-    bool isUpdateComplete();
-
     void setReady(Lock&);
 
     void deliver( // CPG deliver callback. 
@@ -186,7 +178,7 @@
     void updateOutError(const std::exception&);
     void updateOutDone(Lock&);
 
-    void setClusterId(const framing::Uuid&);
+    void setClusterId(const framing::Uuid&, Lock&);
 
     // Immutable members set on construction, never changed.
     ClusterSettings settings;
@@ -202,6 +194,7 @@
     framing::Uuid clusterId;
     NoOpConnectionOutputHandler shadowOut;
     qpid::management::ManagementAgent* mAgent;
+    boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
 
     // Thread safe members
     Multicaster mcast;
@@ -210,11 +203,9 @@
     PollableFrameQueue deliverFrameQueue;
     boost::shared_ptr<FailoverExchange> failoverExchange;
     Quorum quorum;
-
-    // Used only in deliveredFrame thread
-    ClusterMap::Set elders;
-    boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
-    uint64_t eventId;           // FIXME aconway 2009-03-04: review use for thread safety frame-q thread re-enabled.
+    ConnectionMap connections;
+ 
+    // Used only in deliverFrameQueue thread or in deliverEventQueue thread when stalled.
     uint64_t frameId;
 
     // Used only during initialization
@@ -235,15 +226,16 @@
         LEFT     ///< Final state, left the cluster.
     } state;
 
-    ConnectionMap connections;
+    uint64_t eventId;
     ClusterMap map;
+    ClusterMap::Set elders;
     size_t lastSize;
     bool lastBroker;
-
-    //     Update related
     sys::Thread updateThread;
+    sys::Runnable* updateTask;
     boost::optional<ClusterMap> updatedMap;
 
+
   friend std::ostream& operator<<(std::ostream&, const Cluster&);
   friend class ClusterDispatcher;
 };

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp?rev=750574&r1=750573&r2=750574&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp Thu Mar  5 20:24:41 2009
@@ -30,8 +30,8 @@
 namespace qpid {
 namespace cluster {
 
-ExpiryPolicy::ExpiryPolicy(const boost::function<bool()> & f, Multicaster& m, const MemberId& id, broker::Timer& t)
-    : expiredPolicy(new Expired), isLeader(f), mcast(m), memberId(id), timer(t) {}
+ExpiryPolicy::ExpiryPolicy(Multicaster& m, const MemberId& id, broker::Timer& t)
+    : expiredPolicy(new Expired), mcast(m), memberId(id), timer(t) {}
 
 namespace {
 uint64_t clusterId(const broker::Message& m) {
@@ -65,8 +65,7 @@
 
 void ExpiryPolicy::sendExpire(uint64_t id) {
     sys::Mutex::ScopedLock l(lock);
-    if (isLeader()) 
-        mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(), id), memberId);
+    mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(), id), memberId);
 }
 
 void ExpiryPolicy::deliverExpire(uint64_t id) {

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h?rev=750574&r1=750573&r2=750574&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h Thu Mar  5 20:24:41 2009
@@ -42,7 +42,7 @@
 class ExpiryPolicy : public broker::ExpiryPolicy
 {
   public:
-    ExpiryPolicy(const boost::function<bool()> & isLeader, Multicaster&, const MemberId&, broker::Timer&);
+    ExpiryPolicy(Multicaster&, const MemberId&, broker::Timer&);
 
     void willExpire(broker::Message&);
 
@@ -65,7 +65,6 @@
 
     IdSet expired;
     boost::intrusive_ptr<Expired> expiredPolicy;
-    boost::function<bool()> isLeader;
     Multicaster& mcast;
     MemberId memberId;
     broker::Timer& timer;



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org